Skip to main content

dvb_si/
epg.rs

1//! EPG convenience layer.
2//!
3//! This module provides a high-level store for EIT (Event Information Table) data,
4//! making it easy to query the "now and next" events for a service and export
5//! a full EPG schedule.
6//!
7//! It wraps [`crate::collect::EitCollector`] to handle the multi-section
8//! reassembly of EIT tables and maintains a deduplicated, time-ordered event list
9//! for each service keyed by `(original_network_id, transport_stream_id, service_id)`.
10//!
11//! # Quickstart
12//!
13//! ```rust
14//! use dvb_si::epg::{EpgStore, ServiceKey};
15//! use dvb_si::collect::SectionSetCollector;
16//! use chrono::{TimeZone, Utc};
17//!
18//! let mut store = EpgStore::new();
19//!
20//! // Feed EIT sections (from a TS demux, file, etc.)
21//! // store.feed(&eit_section_bytes)?;
22//!
23//! let key = ServiceKey {
24//!     original_network_id: 1,
25//!     transport_stream_id: 1,
26//!     service_id: 100,
27//! };
28//!
29//! // Query now/next (requires EIT present/following data to be fed first)
30//! let now = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
31//! let (now_evt, next_evt) = store.now_and_next(key, now);
32//! if let Some(evt) = now_evt {
33//!     println!("Now:  {} (until {})",
34//!         evt.event_name.as_deref().unwrap_or("?"),
35//!         evt.start_time.map(|t| t + evt.duration.unwrap_or_default())
36//!             .map(|e| e.to_string()).unwrap_or_default());
37//! }
38//! if let Some(evt) = next_evt {
39//!     println!("Next: {} at {}",
40//!         evt.event_name.as_deref().unwrap_or("?"),
41//!         evt.start_time.map(|t| t.to_string()).unwrap_or_default());
42//! }
43//! // Print tonight's schedule (events from 20:00 to midnight)
44//! let tonight = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
45//! let midnight = Utc.with_ymd_and_hms(2026, 6, 11, 0, 0, 0).unwrap();
46//! if let Some(events) = store.schedule(key, tonight, midnight) {
47//!     for evt in &events {
48//!         println!("{:>5}  {}",
49//!             evt.start_time.map(|t| t.format("%H:%M").to_string()).unwrap_or_default(),
50//!             evt.event_name.as_deref().unwrap_or("?"));
51//!     }
52//! }
53//! ```
54//!
55//! # Pruning policy
56//!
57//! The store accumulates events indefinitely. To bound growth under schedule
58//! churn, use [`EpgStore::retain_services`] to remove services that are no longer
59//! of interest and [`EpgStore::clear`] to reset all state at a carousel boundary.
60//! The underlying [`crate::collect::EitCollector`] handles version-driven
61//! section-set replacement automatically — when the version_number on a
62//! sub-table changes, the old partial set is discarded and a new one begins.
63//! Callers scanning a full carousel cycle can `clear()` and start fresh, or
64//! `retain_services` to keep only the active service list.
65
66use crate::collect::CollectResult;
67use std::collections::HashMap;
68
69/// Logical key identifying a service across the DVB network.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
71#[cfg_attr(feature = "serde", derive(serde::Serialize))]
72#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
73pub struct ServiceKey {
74    /// original_network_id from the EIT/SDT.
75    pub original_network_id: u16,
76    /// transport_stream_id from the EIT/SDT.
77    pub transport_stream_id: u16,
78    /// service_id.
79    pub service_id: u16,
80}
81
82/// A decoded view of an EPG event.
83///
84/// Extracted from [`crate::collect::CompleteEitEvent`] with commonly needed
85/// descriptor fields pre-decoded and extended text concatenated per
86/// EN 300 468 §6.2.15.
87///
88/// # Limitations
89///
90/// Only the first descriptor of each kind (short_event, content,
91/// parental_rating, content_identifier) is decoded per event; EIT events
92/// may carry multiple language variants and only the first is taken.
93#[derive(Debug, Clone, PartialEq, Eq)]
94#[cfg_attr(feature = "serde", derive(serde::Serialize))]
95#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
96pub struct EpgEvent {
97    /// 16-bit event_id.
98    pub event_id: u16,
99    /// Decoded start time (MJD + BCD UTC), if valid.
100    pub start_time: Option<chrono::DateTime<chrono::Utc>>,
101    /// Decoded BCD duration, if valid.
102    pub duration: Option<core::time::Duration>,
103    /// 3-bit running status.
104    pub running_status: u8,
105    /// free_CA_mode.
106    pub free_ca_mode: bool,
107    /// Decoded short event name (from
108    /// [`ShortEventDescriptor`](crate::descriptors::short_event::ShortEventDescriptor)),
109    /// if present and decodeable.
110    pub event_name: Option<String>,
111    /// Decoded short event text, if present and decodeable.
112    pub event_text: Option<String>,
113    /// Concatenated extended event text from all
114    /// [`ExtendedEventDescriptor`](crate::descriptors::extended_event::ExtendedEventDescriptor)
115    /// fragments, per EN 300 468 §6.2.15. Fragments are sorted by
116    /// `descriptor_number` and concatenated directly (no separator).
117    pub extended_text: Option<String>,
118    /// Accumulated extended event items (description, value) from all
119    /// [`ExtendedEventDescriptor`](crate::descriptors::extended_event::ExtendedEventDescriptor)
120    /// fragments, sorted by `descriptor_number`.
121    #[cfg_attr(feature = "serde", serde(default))]
122    pub extended_items: Vec<(String, String)>,
123    /// Content genre entries (nibble_1, nibble_2, user_byte) from
124    /// [`ContentDescriptor`](crate::descriptors::content::ContentDescriptor).
125    #[cfg_attr(feature = "serde", serde(default))]
126    pub content_nibbles: Vec<(u8, u8, u8)>,
127    /// Parental rating entries (country_code, rating) from
128    /// [`ParentalRatingDescriptor`](crate::descriptors::parental_rating::ParentalRatingDescriptor).
129    #[cfg_attr(feature = "serde", serde(default))]
130    pub ratings: Vec<(String, u8)>,
131    /// CRID entries (crid_type, inline_crid) from
132    /// [`ContentIdentifierDescriptor`](crate::descriptors::content_identifier::ContentIdentifierDescriptor).
133    #[cfg_attr(feature = "serde", serde(default))]
134    pub crids: Vec<(u8, String)>,
135}
136
137/// Serialisable service data exposed by [`EpgStore`] serde export.
138#[derive(Debug, Clone)]
139#[cfg_attr(feature = "serde", derive(serde::Serialize))]
140#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
141struct ServiceData {
142    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
143    service_name: Option<String>,
144    events: Vec<EpgEvent>,
145}
146
147/// A store for EIT data, providing high-level access to program events.
148///
149/// Wraps a [`crate::collect::EitCollector`] and maintains a deduplicated,
150/// event list per service. Optionally accepts SDT data to attach service names.
151///
152/// Serde export serializes the cache as a map of `ServiceKey` → service data.
153///
154/// # Limitations
155///
156/// Events are deduplicated by (start_time, event_id) and never evicted
157/// once stored: events removed from a re-versioned schedule, or events
158/// already in the past, remain in the store. Call [`clear()`](Self::clear)
159/// or [`retain_services()`](Self::retain_services) to bound long-running
160/// memory.
161///
162/// # Example
163///
164/// ```no_run
165/// use dvb_si::epg::{EpgStore, ServiceKey};
166/// use chrono::Utc;
167///
168/// let mut store = EpgStore::new();
169/// // store.feed(&eit_section_bytes).unwrap();
170///
171/// let key = ServiceKey {
172///     original_network_id: 1,
173///     transport_stream_id: 1,
174///     service_id: 100,
175/// };
176///
177/// let (now_evt, _next) = store.now_and_next(key, Utc::now());
178/// if let Some(e) = now_evt {
179///     println!("Now playing: {:?}", e.event_name);
180/// }
181/// ```
182#[derive(Debug, Default)]
183pub struct EpgStore {
184    collector: crate::collect::EitCollector,
185    cache: HashMap<ServiceKey, ServiceEpg>,
186}
187
188#[derive(Debug, Default)]
189struct ServiceEpg {
190    service_name: Option<String>,
191    /// Deduplicated by event_id. Latest version wins (later inserts overwrite).
192    events: HashMap<u16, EpgEvent>,
193}
194
195impl EpgStore {
196    /// Create a new, empty EPG store.
197    #[must_use]
198    pub fn new() -> Self {
199        Self::default()
200    }
201
202    /// Feed one EIT section into the store.
203    ///
204    /// If a table becomes complete, its events are merged into the cache
205    /// (deduplicated by `event_id`, later insertions overwrite).
206    ///
207    /// # Errors
208    ///
209    /// Returns a [`crate::collect::CollectError`] if the section is malformed.
210    pub fn feed(&mut self, bytes: &[u8]) -> CollectResult<()> {
211        self.feed_with_pid(None, bytes)
212    }
213
214    /// Feed one EIT section with PID context into the store.
215    pub fn feed_with_pid(&mut self, pid: Option<u16>, bytes: &[u8]) -> CollectResult<()> {
216        if let Some(completed) = self.collector.push_section_with_pid(pid, bytes)? {
217            let tables = completed.tables()?;
218            for table in &tables {
219                let key = ServiceKey {
220                    original_network_id: table.original_network_id,
221                    transport_stream_id: table.transport_stream_id,
222                    service_id: table.service_id,
223                };
224                let svc = self.cache.entry(key).or_default();
225                for event in &table.events {
226                    svc.events.insert(event.event_id, event_to_epg(event));
227                }
228            }
229        }
230        Ok(())
231    }
232
233    /// Feed completed SDT data to attach service names.
234    ///
235    /// Accepts a parsed [`crate::collect::CompleteSdt`] from a
236    /// [`crate::collect::SectionSetCollector`].
237    pub fn feed_sdt(&mut self, sdt: &crate::collect::CompleteSdt<'_>) {
238        for svc in &sdt.services {
239            let key = ServiceKey {
240                original_network_id: sdt.original_network_id,
241                transport_stream_id: sdt.transport_stream_id,
242                service_id: svc.service_id,
243            };
244            let entry = self.cache.entry(key).or_default();
245            entry.service_name = extract_service_name(svc.descriptors.descriptors());
246        }
247    }
248
249    /// Get the "now" and "next" events for a service.
250    ///
251    /// Searches the event list for the given service and returns the event
252    /// currently on-air ("now") and the next upcoming event ("next") based
253    /// on reference time `at`.
254    ///
255    /// "now" is the event where `at` falls within `[start, start + duration)`.
256    /// "next" is the earliest event where `start > at`.
257    ///
258    /// An event ending exactly at `at` is NOT considered "now" (exclusive end).
259    ///
260    /// Returns `(None, None)` when the service is unknown or no event matches.
261    pub fn now_and_next(
262        &self,
263        key: ServiceKey,
264        at: chrono::DateTime<chrono::Utc>,
265    ) -> (Option<&EpgEvent>, Option<&EpgEvent>) {
266        let Some(svc) = self.cache.get(&key) else {
267            return (None, None);
268        };
269
270        let now = svc.events.values().find(|e| {
271            if let (Some(start), Some(dur)) = (e.start_time, e.duration) {
272                let end = start + dur;
273                return at >= start && at < end;
274            }
275            false
276        });
277
278        let next = svc.events.values().find(|e| {
279            if let Some(start) = e.start_time {
280                return start > at;
281            }
282            false
283        });
284
285        (now, next)
286    }
287
288    /// Query events with start times in the half-open range `[from, to)`.
289    ///
290    /// Returns events sorted by start time (valid times first, then by
291    /// event_id). Events without a decodable start time are excluded.
292    #[must_use]
293    pub fn schedule(
294        &self,
295        key: ServiceKey,
296        from: chrono::DateTime<chrono::Utc>,
297        to: chrono::DateTime<chrono::Utc>,
298    ) -> Option<Vec<&EpgEvent>> {
299        let svc = self.cache.get(&key)?;
300        let mut events: Vec<&EpgEvent> = svc
301            .events
302            .values()
303            .filter(|e| {
304                if let Some(start) = e.start_time {
305                    start >= from && start < to
306                } else {
307                    false
308                }
309            })
310            .collect();
311        events.sort_by_key(|a| a.start_time.unwrap());
312        Some(events)
313    }
314
315    /// Return the service name for a given key, if SDT data was fed.
316    #[must_use]
317    pub fn service_name(&self, key: ServiceKey) -> Option<&str> {
318        self.cache.get(&key).and_then(|s| s.service_name.as_deref())
319    }
320
321    /// Iterate the [`ServiceKey`]s of every service with cached EIT data, so
322    /// callers can walk the whole EPG (e.g. render a grid) without knowing the
323    /// service ids in advance. Order is unspecified.
324    pub fn services(&self) -> impl Iterator<Item = ServiceKey> + '_ {
325        self.cache.keys().copied()
326    }
327
328    /// Return all events for a service, sorted by start time
329    /// (events without a valid start time sort last, then by event_id).
330    #[must_use]
331    pub fn events(&self, key: ServiceKey) -> Option<Vec<&EpgEvent>> {
332        let svc = self.cache.get(&key)?;
333        let mut events: Vec<&EpgEvent> = svc.events.values().collect();
334        events.sort_by(|a, b| match (a.start_time, b.start_time) {
335            (Some(at), Some(bt)) => at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id)),
336            (Some(_), None) => std::cmp::Ordering::Less,
337            (None, Some(_)) => std::cmp::Ordering::Greater,
338            (None, None) => a.event_id.cmp(&b.event_id),
339        });
340        Some(events)
341    }
342
343    /// Return the number of services with cached EIT data.
344    #[must_use]
345    pub fn service_count(&self) -> usize {
346        self.cache.len()
347    }
348
349    /// Return the total number of events across all services.
350    #[must_use]
351    pub fn event_count(&self) -> usize {
352        self.cache.values().map(|s| s.events.len()).sum()
353    }
354
355    /// Retain only services matching the given predicate.
356    ///
357    /// Both the event cache and the underlying collector partial state
358    /// for rejected keys are removed.
359    pub fn retain_services<F>(&mut self, mut keep: F)
360    where
361        F: FnMut(&ServiceKey) -> bool,
362    {
363        self.cache.retain(|key, _| keep(key));
364        self.collector.retain_logical(|lk| {
365            keep(&ServiceKey {
366                original_network_id: lk.original_network_id,
367                transport_stream_id: lk.transport_stream_id,
368                service_id: lk.service_id,
369            })
370        });
371    }
372
373    /// Clear all cached EIT data and reset the internal collector.
374    pub fn clear(&mut self) {
375        self.collector.clear();
376        self.cache.clear();
377    }
378}
379
380#[cfg(feature = "serde")]
381impl serde::Serialize for EpgStore {
382    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
383        use serde::ser::SerializeMap;
384        let mut m = s.serialize_map(Some(self.cache.len()))?;
385        for (key, svc) in &self.cache {
386            let data = ServiceData {
387                service_name: svc.service_name.clone(),
388                events: {
389                    let mut evts: Vec<EpgEvent> = svc.events.values().cloned().collect();
390                    evts.sort_by(|a, b| match (a.start_time, b.start_time) {
391                        (Some(at), Some(bt)) => {
392                            at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id))
393                        }
394                        (Some(_), None) => std::cmp::Ordering::Less,
395                        (None, Some(_)) => std::cmp::Ordering::Greater,
396                        (None, None) => a.event_id.cmp(&b.event_id),
397                    });
398                    evts
399                },
400            };
401            let key_str = format!(
402                "{}-{}-{}",
403                key.original_network_id, key.transport_stream_id, key.service_id
404            );
405            m.serialize_entry(&key_str, &data)?;
406        }
407        m.end()
408    }
409}
410
411fn event_to_epg(e: &crate::collect::CompleteEitEvent<'_>) -> EpgEvent {
412    let (event_name, event_text) = extract_short_event(e.descriptors.descriptors());
413    let (extended_text, extended_items) = extract_extended(e.descriptors.descriptors());
414    let content_nibbles = extract_content(e.descriptors.descriptors());
415    let ratings = extract_ratings(e.descriptors.descriptors());
416    let crids = extract_crids(e.descriptors.descriptors());
417
418    EpgEvent {
419        event_id: e.event_id,
420        start_time: e.start_time(),
421        duration: e.duration(),
422        running_status: e.running_status,
423        free_ca_mode: e.free_ca_mode,
424        event_name,
425        event_text,
426        extended_text,
427        extended_items,
428        content_nibbles,
429        ratings,
430        crids,
431    }
432}
433
434fn extract_short_event(
435    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
436) -> (Option<String>, Option<String>) {
437    for desc in descriptors {
438        if let Ok(crate::descriptors::AnyDescriptor::ShortEvent(se)) = desc {
439            return (
440                Some(se.event_name.decode().into_owned()),
441                Some(se.text.decode().into_owned()),
442            );
443        }
444    }
445    (None, None)
446}
447
448struct ExtendedFragment {
449    descriptor_number: u8,
450    text: String,
451    items: Vec<(String, String)>,
452}
453
454fn extract_extended(
455    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
456) -> (Option<String>, Vec<(String, String)>) {
457    use crate::descriptors::AnyDescriptor;
458
459    let mut fragments: Vec<ExtendedFragment> = descriptors
460        .iter()
461        .filter_map(|d| {
462            if let Ok(AnyDescriptor::ExtendedEvent(ee)) = d {
463                let text = ee.text.decode().into_owned();
464                let items: Vec<(String, String)> = ee
465                    .items
466                    .iter()
467                    .map(|i| {
468                        (
469                            i.description.decode().into_owned(),
470                            i.value.decode().into_owned(),
471                        )
472                    })
473                    .collect();
474                if !text.is_empty() || !items.is_empty() {
475                    Some(ExtendedFragment {
476                        descriptor_number: ee.descriptor_number,
477                        text,
478                        items,
479                    })
480                } else {
481                    None
482                }
483            } else {
484                None
485            }
486        })
487        .collect();
488
489    if fragments.is_empty() {
490        return (None, Vec::new());
491    }
492
493    // Sort by descriptor_number per EN 300 468 §6.2.15.
494    fragments.sort_by_key(|f| f.descriptor_number);
495
496    let extended_text: String = fragments.iter().map(|f| f.text.as_str()).collect();
497
498    let extended_items: Vec<(String, String)> =
499        fragments.into_iter().flat_map(|f| f.items).collect();
500
501    let text = if extended_text.is_empty() {
502        None
503    } else {
504        Some(extended_text)
505    };
506
507    (text, extended_items)
508}
509
510fn extract_content(
511    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
512) -> Vec<(u8, u8, u8)> {
513    for desc in descriptors {
514        if let Ok(crate::descriptors::AnyDescriptor::Content(ct)) = desc {
515            return ct
516                .entries
517                .iter()
518                .map(|e| (e.nibble_1, e.nibble_2, e.user_byte))
519                .collect();
520        }
521    }
522    Vec::new()
523}
524
525fn extract_ratings(
526    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
527) -> Vec<(String, u8)> {
528    for desc in descriptors {
529        if let Ok(crate::descriptors::AnyDescriptor::ParentalRating(pr)) = desc {
530            return pr
531                .entries
532                .iter()
533                .map(|e| (e.country_code.as_str().into_owned(), e.rating))
534                .collect();
535        }
536    }
537    Vec::new()
538}
539
540fn extract_crids(
541    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
542) -> Vec<(u8, String)> {
543    use crate::descriptors::content_identifier::CridLocation;
544    for desc in descriptors {
545        if let Ok(crate::descriptors::AnyDescriptor::ContentIdentifier(ci)) = desc {
546            return ci
547                .entries
548                .iter()
549                .filter_map(|e| match e.location {
550                    CridLocation::Inline(bytes) => {
551                        let s = String::from_utf8_lossy(bytes).into_owned();
552                        Some((e.crid_type, s))
553                    }
554                    _ => None,
555                })
556                .collect();
557        }
558    }
559    Vec::new()
560}
561
562fn extract_service_name(
563    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
564) -> Option<String> {
565    for desc in descriptors {
566        if let Ok(crate::descriptors::AnyDescriptor::Service(svc)) = desc {
567            return Some(svc.service_name.decode().into_owned());
568        }
569    }
570    None
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use chrono::{TimeZone, Utc};
577
578    // ------------------------------------------------------------------
579    // Helpers
580    // ------------------------------------------------------------------
581
582    /// Build the bytes of a minimal short_event_descriptor.
583    fn short_event_bytes(name: &[u8], text: &[u8]) -> Vec<u8> {
584        let lang = b"eng";
585        let mut v = Vec::new();
586        v.push(0x4Du8); // tag
587        v.push((3 + 1 + name.len() + 1 + text.len()) as u8); // length
588        v.extend_from_slice(lang);
589        v.push(name.len() as u8);
590        v.extend_from_slice(name);
591        v.push(text.len() as u8);
592        v.extend_from_slice(text);
593        v
594    }
595
596    /// Build the bytes of a minimal EIT present/following section
597    /// with one event. Returns bytes formated as a complete TS section
598    /// (including CRC-32).
599    #[allow(clippy::too_many_arguments)]
600    fn eit_pf_section(
601        service_id: u16,
602        ts_id: u16,
603        on_id: u16,
604        event_id: u16,
605        version: u8,
606        start_raw: [u8; 5],
607        dur_raw: [u8; 3],
608        descriptors: &[u8],
609    ) -> Vec<u8> {
610        let table_id = 0x4Eu8;
611
612        // Header: 3 + ext_header(5) + post_ext(6) = 14
613        // Event: 12 + descriptors.len()
614        // CRC: 4
615        let ev_len = 12 + descriptors.len();
616        let section_length = 5 + 6 + ev_len + 4;
617        let total = 3 + section_length;
618
619        let mut buf = vec![0u8; total];
620        buf[0] = table_id;
621        buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
622        buf[2] = (section_length & 0xFF) as u8;
623        buf[3..5].copy_from_slice(&service_id.to_be_bytes());
624        // reserved(2)=0b11, version, current_next=1
625        buf[5] = 0xC0 | ((version & 0x1F) << 1) | 0x01;
626        buf[6] = 0; // section_number
627        buf[7] = 0; // last_section_number
628        buf[8..10].copy_from_slice(&ts_id.to_be_bytes());
629        buf[10..12].copy_from_slice(&on_id.to_be_bytes());
630        buf[12] = 0; // segment_last_section_number
631        buf[13] = 0x5F; // last_table_id
632
633        // Event
634        let ev_off = 14;
635        buf[ev_off..ev_off + 2].copy_from_slice(&event_id.to_be_bytes());
636        buf[ev_off + 2..ev_off + 7].copy_from_slice(&start_raw);
637        buf[ev_off + 7..ev_off + 10].copy_from_slice(&dur_raw);
638        let dll = descriptors.len() as u16;
639        buf[ev_off + 10] = ((dll >> 8) as u8) & 0x0F;
640        buf[ev_off + 11] = (dll & 0xFF) as u8;
641        buf[ev_off + 12..ev_off + 12 + descriptors.len()].copy_from_slice(descriptors);
642
643        // CRC-32
644        let crc_pos = total - 4;
645        let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_pos]);
646        buf[crc_pos..].copy_from_slice(&crc.to_be_bytes());
647        buf
648    }
649
650    // ------------------------------------------------------------------
651    // Basic tests
652    // ------------------------------------------------------------------
653
654    #[test]
655    fn new_store_is_empty() {
656        let store = EpgStore::new();
657        assert_eq!(store.service_count(), 0);
658        assert_eq!(store.event_count(), 0);
659    }
660
661    #[test]
662    fn feed_empty_is_error() {
663        let mut store = EpgStore::new();
664        assert!(store.feed(&[]).is_err());
665    }
666
667    #[test]
668    fn now_and_next_no_data_returns_none() {
669        let store = EpgStore::new();
670        let now = Utc::now();
671        let key = ServiceKey {
672            original_network_id: 1,
673            transport_stream_id: 1,
674            service_id: 100,
675        };
676        assert_eq!(store.now_and_next(key, now), (None, None));
677    }
678
679    #[test]
680    fn service_key_ordering() {
681        let a = ServiceKey {
682            original_network_id: 1,
683            transport_stream_id: 2,
684            service_id: 100,
685        };
686        let b = ServiceKey {
687            original_network_id: 1,
688            transport_stream_id: 2,
689            service_id: 200,
690        };
691        assert!(a < b);
692    }
693
694    #[test]
695    fn events_sorts_valid_before_invalid() {
696        let valid = EpgEvent {
697            event_id: 1,
698            start_time: Some(Utc::now()),
699            duration: Some(core::time::Duration::from_secs(3600)),
700            running_status: 0,
701            free_ca_mode: false,
702            event_name: None,
703            event_text: None,
704            extended_text: None,
705            extended_items: Vec::new(),
706            content_nibbles: Vec::new(),
707            ratings: Vec::new(),
708            crids: Vec::new(),
709        };
710        let invalid = EpgEvent {
711            event_id: 2,
712            start_time: None,
713            duration: None,
714            running_status: 0,
715            free_ca_mode: false,
716            event_name: None,
717            event_text: None,
718            extended_text: None,
719            extended_items: Vec::new(),
720            content_nibbles: Vec::new(),
721            ratings: Vec::new(),
722            crids: Vec::new(),
723        };
724
725        let mut events = [&invalid, &valid];
726        events.sort_by(|a, b| match (a.start_time, b.start_time) {
727            (Some(at), Some(bt)) => at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id)),
728            (Some(_), None) => std::cmp::Ordering::Less,
729            (None, Some(_)) => std::cmp::Ordering::Greater,
730            (None, None) => a.event_id.cmp(&b.event_id),
731        });
732        assert_eq!(events[0].event_id, 1);
733        assert_eq!(events[1].event_id, 2);
734    }
735
736    // ------------------------------------------------------------------
737    // §6.2.15 extended event text chaining
738    // ------------------------------------------------------------------
739
740    #[test]
741    fn extended_text_chaining_per_spec_6_2_15() {
742        use crate::descriptors::extended_event::ExtendedEventDescriptor;
743        use crate::descriptors::AnyDescriptor;
744        use crate::text::{DvbText, LangCode};
745
746        // Fragment 1: descriptor_number=2, last_descriptor_number=3
747        // "The quick " + item ("Director", "Alice")
748        let frag1 = ExtendedEventDescriptor {
749            descriptor_number: 2,
750            last_descriptor_number: 3,
751            language_code: LangCode(*b"eng"),
752            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
753                description: DvbText::new(b"Director"),
754                value: DvbText::new(b"Alice"),
755            }],
756            text: DvbText::new(b"The quick "),
757        };
758
759        // Fragment 2: descriptor_number=0, last_descriptor_number=3
760        // "brown fox" + item ("Year", "2026")
761        let frag2 = ExtendedEventDescriptor {
762            descriptor_number: 0,
763            last_descriptor_number: 3,
764            language_code: LangCode(*b"eng"),
765            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
766                description: DvbText::new(b"Year"),
767                value: DvbText::new(b"2026"),
768            }],
769            text: DvbText::new(b"brown fox"),
770        };
771
772        // Fragment 3: descriptor_number=3, last_descriptor_number=3
773        // "jumps." + no items
774        let frag3 = ExtendedEventDescriptor {
775            descriptor_number: 3,
776            last_descriptor_number: 3,
777            language_code: LangCode(*b"eng"),
778            items: vec![],
779            text: DvbText::new(b"jumps."),
780        };
781
782        // Fragment 4: descriptor_number=1, last_descriptor_number=3
783        // empty text + item ("Genre", "Thriller") — dropped by the chaining
784        // helper (text is empty but items present → included)
785        let frag4 = ExtendedEventDescriptor {
786            descriptor_number: 1,
787            last_descriptor_number: 3,
788            language_code: LangCode(*b"eng"),
789            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
790                description: DvbText::new(b"Genre"),
791                value: DvbText::new(b"Thriller"),
792            }],
793            text: DvbText::new(b""),
794        };
795
796        // Feed fragments out of order via AnyDescriptor.
797        let descriptors: Vec<crate::Result<AnyDescriptor<'_>>> = vec![
798            Ok(AnyDescriptor::ExtendedEvent(frag1)), // dn=2
799            Ok(AnyDescriptor::ExtendedEvent(frag4)), // dn=1
800            Ok(AnyDescriptor::ExtendedEvent(frag3)), // dn=3
801            Ok(AnyDescriptor::ExtendedEvent(frag2)), // dn=0
802        ];
803
804        let (text, items) = extract_extended(&descriptors);
805
806        // Text concatenated in descriptor_number order: 0,1,2,3
807        assert_eq!(text.as_deref(), Some("brown foxThe quick jumps."));
808
809        // Items accumulated in descriptor_number order: dn=0 ("Year"/"2026"),
810        // dn=1 ("Genre"/"Thriller"), dn=2 ("Director"/"Alice"), dn=3 (none)
811        assert_eq!(items.len(), 3);
812        assert_eq!(items[0], ("Year".to_string(), "2026".to_string()));
813        assert_eq!(items[1], ("Genre".to_string(), "Thriller".to_string()));
814        assert_eq!(items[2], ("Director".to_string(), "Alice".to_string()));
815    }
816
817    // ------------------------------------------------------------------
818    // now_and_next boundary correctness
819    // ------------------------------------------------------------------
820
821    #[test]
822    fn now_and_next_event_boundary() {
823        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
824        let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
825        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
826
827        // Event 1: 10:00-11:00
828        // Event 2: 12:00-13:00
829        let sec = core::time::Duration::from_secs(3600);
830        let ev1 = EpgEvent {
831            event_id: 1,
832            start_time: Some(t1000),
833            duration: Some(sec),
834            running_status: 0,
835            free_ca_mode: false,
836            event_name: Some("Event 1".into()),
837            event_text: None,
838            extended_text: None,
839            extended_items: vec![],
840            content_nibbles: vec![],
841            ratings: vec![],
842            crids: vec![],
843        };
844        let ev2 = EpgEvent {
845            event_id: 2,
846            start_time: Some(t1200),
847            duration: Some(sec),
848            running_status: 0,
849            free_ca_mode: false,
850            event_name: Some("Event 2".into()),
851            event_text: None,
852            extended_text: None,
853            extended_items: vec![],
854            content_nibbles: vec![],
855            ratings: vec![],
856            crids: vec![],
857        };
858
859        // Set up store manually (bypass feed).
860        let mut store = EpgStore::new();
861        let key = ServiceKey {
862            original_network_id: 1,
863            transport_stream_id: 1,
864            service_id: 100,
865        };
866        let svc = store.cache.entry(key).or_default();
867        svc.events.insert(1, ev1);
868        svc.events.insert(2, ev2);
869
870        // At 10:30 — now=Event 1, next=Event 2
871        let at = Utc.with_ymd_and_hms(2026, 6, 10, 10, 30, 0).unwrap();
872        let (now, next) = store.now_and_next(key, at);
873        assert_eq!(now.unwrap().event_id, 1);
874        assert_eq!(next.unwrap().event_id, 2);
875
876        // At 11:00 exactly — event 1 just ended (exclusive end),
877        // now=None, next=Event 2
878        let (now, next) = store.now_and_next(key, t1100);
879        assert!(now.is_none(), "event ending at query time must NOT be now");
880        assert_eq!(next.unwrap().event_id, 2);
881
882        // At 12:00 exactly — now=Event 2 (start == at, inclusive start),
883        // next=None
884        let (now, next) = store.now_and_next(key, t1200);
885        assert_eq!(now.unwrap().event_id, 2);
886        assert!(next.is_none());
887    }
888
889    // ------------------------------------------------------------------
890    // Version churn: bounded growth
891    // ------------------------------------------------------------------
892
893    #[test]
894    fn version_churn_bounded_growth() {
895        // Feed an event, then feed the same event_id with updated data.
896        // Store size must stay at 1 event.
897        let s = |hh: u32| {
898            let t = Utc.with_ymd_and_hms(2026, 6, 10, hh, 0, 0).unwrap();
899            let days = 61785u16; // MJD for 2026-06-10
900            let mjd_bytes = days.to_be_bytes();
901            let bcd_time = [(hh / 10 * 16 + hh % 10) as u8, 0, 0];
902            (
903                [
904                    mjd_bytes[0],
905                    mjd_bytes[1],
906                    bcd_time[0],
907                    bcd_time[1],
908                    bcd_time[2],
909                ],
910                t,
911            )
912        };
913
914        let (start1, _) = s(10);
915        let (start2, _) = s(14);
916
917        let desc1 = short_event_bytes(b"News at 10", b"");
918        let desc2 = short_event_bytes(b"News at 14", b"");
919
920        let eit1 = eit_pf_section(100, 1, 1, 1, 0, start1, [1, 0, 0], &desc1);
921        let eit2 = eit_pf_section(100, 1, 1, 1, 1, start2, [1, 0, 0], &desc2);
922
923        let mut store = EpgStore::new();
924        store.feed(&eit1).unwrap();
925        assert_eq!(store.event_count(), 1);
926        store.feed(&eit2).unwrap();
927        // Same event_id should overwrite, not duplicate
928        assert_eq!(store.event_count(), 1);
929
930        let key = ServiceKey {
931            original_network_id: 1,
932            transport_stream_id: 1,
933            service_id: 100,
934        };
935        let evts = store.events(key).unwrap();
936        assert_eq!(evts.len(), 1);
937        assert_eq!(evts[0].event_name.as_deref(), Some("News at 14"));
938    }
939
940    // ------------------------------------------------------------------
941    // schedule range query
942    // ------------------------------------------------------------------
943
944    #[test]
945    fn schedule_range_query() {
946        let t0900 = Utc.with_ymd_and_hms(2026, 6, 10, 9, 0, 0).unwrap();
947        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
948        let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
949        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
950
951        let sec = core::time::Duration::from_secs(1800);
952        let mut store = EpgStore::new();
953        let key = ServiceKey {
954            original_network_id: 1,
955            transport_stream_id: 1,
956            service_id: 100,
957        };
958        let svc = store.cache.entry(key).or_default();
959        for (id, t) in [(1, t0900), (2, t1000), (3, t1100)] {
960            svc.events.insert(
961                id,
962                EpgEvent {
963                    event_id: id,
964                    start_time: Some(t),
965                    duration: Some(sec),
966                    running_status: 0,
967                    free_ca_mode: false,
968                    event_name: Some(format!("Event {id}")),
969                    event_text: None,
970                    extended_text: None,
971                    extended_items: vec![],
972                    content_nibbles: vec![],
973                    ratings: vec![],
974                    crids: vec![],
975                },
976            );
977        }
978
979        // [10:00, 12:00) → events 2 and 3
980        let events = store.schedule(key, t1000, t1200).unwrap();
981        assert_eq!(events.len(), 2);
982        assert_eq!(events[0].event_id, 2);
983        assert_eq!(events[1].event_id, 3);
984
985        // [12:00, 13:00) → empty
986        let events = store.schedule(key, t1200, t1100).unwrap();
987        assert!(events.is_empty());
988    }
989
990    // ------------------------------------------------------------------
991    // serde round-trip
992    // ------------------------------------------------------------------
993
994    #[cfg(feature = "serde")]
995    #[test]
996    fn serde_serializes_store_as_json() {
997        let t = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
998        let mut store = EpgStore::new();
999        let key = ServiceKey {
1000            original_network_id: 1,
1001            transport_stream_id: 1,
1002            service_id: 100,
1003        };
1004        let svc = store.cache.entry(key).or_default();
1005        svc.service_name = Some("BBC One".into());
1006        svc.events.insert(
1007            1,
1008            EpgEvent {
1009                event_id: 1,
1010                start_time: Some(t),
1011                duration: Some(core::time::Duration::from_secs(3600)),
1012                running_status: 4,
1013                free_ca_mode: false,
1014                event_name: Some("The News".into()),
1015                event_text: Some("Today's headlines".into()),
1016                extended_text: None,
1017                extended_items: vec![],
1018                content_nibbles: vec![(1, 1, 0)],
1019                ratings: vec![],
1020                crids: vec![],
1021            },
1022        );
1023
1024        let json = serde_json::to_string(&store).unwrap();
1025        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1026        let svc_data = &v["1-1-100"];
1027        assert_eq!(svc_data["serviceName"], "BBC One");
1028        assert_eq!(svc_data["events"][0]["eventName"], "The News");
1029        assert_eq!(
1030            svc_data["events"][0]["contentNibbles"][0],
1031            serde_json::json!([1, 1, 0])
1032        );
1033    }
1034}