1use crate::collect::CollectResult;
87use crate::tables::RunningStatus;
88use alloc::collections::BTreeMap;
89use alloc::format;
90use alloc::string::String;
91use alloc::vec::Vec;
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
95#[cfg_attr(feature = "serde", derive(serde::Serialize))]
96pub struct ServiceKey {
97 pub original_network_id: u16,
99 pub transport_stream_id: u16,
101 pub service_id: u16,
103}
104
105#[non_exhaustive]
108#[derive(Debug, Clone, PartialEq, Eq)]
109#[cfg_attr(feature = "serde", derive(serde::Serialize))]
110pub struct Rating {
111 pub country: String,
113 pub value: u8,
115}
116
117impl Rating {
118 #[must_use]
120 pub fn minimum_age(&self) -> Option<u8> {
121 match self.value {
122 0x01..=0x0F => Some(self.value + 3),
123 _ => None,
124 }
125 }
126}
127
128pub use crate::descriptors::content_identifier::CridType;
132
133#[non_exhaustive]
136#[derive(Debug, Clone, PartialEq, Eq)]
137#[cfg_attr(feature = "serde", derive(serde::Serialize))]
138pub struct Crid {
139 pub crid_type: CridType,
141 pub crid: String,
143}
144
145#[non_exhaustive]
147#[derive(Debug, Clone, PartialEq, Eq)]
148#[cfg_attr(feature = "serde", derive(serde::Serialize))]
149pub struct ExtendedItem {
150 pub description: String,
152 pub item: String,
154}
155
156#[non_exhaustive]
159#[derive(Debug, Clone, PartialEq, Eq)]
160#[cfg_attr(feature = "serde", derive(serde::Serialize))]
161pub struct ContentNibble {
162 pub level_1: u8,
164 pub level_2: u8,
166 pub user: u8,
168}
169
170impl ContentNibble {
171 #[must_use]
173 pub fn genre(&self) -> crate::descriptors::content::ContentGenre {
174 crate::descriptors::content::ContentGenre::from_nibble_1(self.level_1)
175 }
176
177 #[must_use]
179 pub fn genre_name(&self) -> &'static str {
180 crate::descriptors::content::content_genre_name(self.level_1, self.level_2)
181 }
182}
183
184#[non_exhaustive]
196#[derive(Debug, Clone, PartialEq, Eq)]
197#[cfg_attr(feature = "serde", derive(serde::Serialize))]
198pub struct EpgEvent {
199 pub event_id: u16,
201 pub start_time: Option<chrono::DateTime<chrono::Utc>>,
203 pub duration: Option<core::time::Duration>,
205 pub running_status: RunningStatus,
207 pub free_ca_mode: bool,
209 pub event_name: Option<String>,
213 pub event_text: Option<String>,
215 pub extended_text: Option<String>,
220 #[cfg_attr(feature = "serde", serde(default))]
224 pub extended_items: Vec<ExtendedItem>,
225 #[cfg_attr(feature = "serde", serde(default))]
228 pub content_nibbles: Vec<ContentNibble>,
229 #[cfg_attr(feature = "serde", serde(default))]
232 pub ratings: Vec<Rating>,
233 #[cfg_attr(feature = "serde", serde(default))]
236 pub crids: Vec<Crid>,
237}
238
239#[derive(Debug, Clone)]
241#[cfg_attr(feature = "serde", derive(serde::Serialize))]
242struct ServiceData {
243 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
244 service_name: Option<String>,
245 events: Vec<EpgEvent>,
246}
247
248pub const DEFAULT_MAX_SERVICES: usize = 1024;
254
255pub const DEFAULT_MAX_EVENTS_PER_SERVICE: usize = 8192;
261
262#[derive(Debug)]
304pub struct EpgStore {
305 collector: crate::collect::EitCollector,
306 cache: BTreeMap<ServiceKey, ServiceEpg>,
307 max_services: usize,
308 max_events_per_service: usize,
309}
310
311impl Default for EpgStore {
312 fn default() -> Self {
313 Self {
314 collector: crate::collect::EitCollector::default(),
315 cache: BTreeMap::new(),
316 max_services: DEFAULT_MAX_SERVICES,
317 max_events_per_service: DEFAULT_MAX_EVENTS_PER_SERVICE,
318 }
319 }
320}
321
322#[derive(Debug, Default)]
323struct ServiceEpg {
324 service_name: Option<String>,
325 events: BTreeMap<u16, EpgEvent>,
327}
328
329impl EpgStore {
330 #[must_use]
333 pub fn new() -> Self {
334 Self::default()
335 }
336
337 #[must_use]
342 pub fn with_max_services(mut self, max_services: usize) -> Self {
343 self.max_services = max_services;
344 self
345 }
346
347 #[must_use]
352 pub fn with_max_events_per_service(mut self, max_events_per_service: usize) -> Self {
353 self.max_events_per_service = max_events_per_service;
354 self
355 }
356
357 #[must_use]
361 pub fn with_collector_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
362 self.collector = self.collector.with_max_logical_keys(max_logical_keys);
363 self
364 }
365
366 pub fn feed(&mut self, bytes: &[u8]) -> CollectResult<()> {
375 self.feed_with_pid(None, bytes)
376 }
377
378 pub fn feed_with_pid(&mut self, pid: Option<u16>, bytes: &[u8]) -> CollectResult<()> {
380 if let Some(completed) = self.collector.push_section_with_pid(pid, bytes)? {
381 let tables = completed.tables()?;
382 for table in &tables {
383 let key = ServiceKey {
384 original_network_id: table.original_network_id,
385 transport_stream_id: table.transport_stream_id,
386 service_id: table.service_id,
387 };
388 if self.cache.len() >= self.max_services && !self.cache.contains_key(&key) {
389 continue;
390 }
391 let svc = self.cache.entry(key).or_default();
392 for event in &table.events {
393 if svc.events.len() >= self.max_events_per_service
394 && !svc.events.contains_key(&event.event_id)
395 {
396 continue;
397 }
398 svc.events.insert(event.event_id, event_to_epg(event));
399 }
400 }
401 }
402 Ok(())
403 }
404
405 pub fn feed_sdt(&mut self, sdt: &crate::collect::CompleteSdt<'_>) {
410 for svc in &sdt.services {
411 let key = ServiceKey {
412 original_network_id: sdt.original_network_id,
413 transport_stream_id: sdt.transport_stream_id,
414 service_id: svc.service_id,
415 };
416 let entry = self.cache.entry(key).or_default();
417 entry.service_name = extract_service_name(svc.descriptors.descriptors());
418 }
419 }
420
421 pub fn now_and_next(
435 &self,
436 key: ServiceKey,
437 at: chrono::DateTime<chrono::Utc>,
438 ) -> (Option<&EpgEvent>, Option<&EpgEvent>) {
439 let Some(svc) = self.cache.get(&key) else {
440 return (None, None);
441 };
442
443 let now = svc.events.values().find(|e| {
444 if let (Some(start), Some(dur)) = (e.start_time, e.duration) {
445 let end = start + dur;
446 return at >= start && at < end;
447 }
448 false
449 });
450
451 let next = svc
452 .events
453 .values()
454 .filter(|e| {
455 if let Some(start) = e.start_time {
456 start > at
457 } else {
458 false
459 }
460 })
461 .min_by_key(|e| e.start_time);
462
463 (now, next)
464 }
465
466 #[must_use]
471 pub fn schedule(
472 &self,
473 key: ServiceKey,
474 from: chrono::DateTime<chrono::Utc>,
475 to: chrono::DateTime<chrono::Utc>,
476 ) -> Option<Vec<&EpgEvent>> {
477 let svc = self.cache.get(&key)?;
478 let mut events: Vec<&EpgEvent> = svc
479 .events
480 .values()
481 .filter(|e| {
482 if let Some(start) = e.start_time {
483 start >= from && start < to
484 } else {
485 false
486 }
487 })
488 .collect();
489 events.sort_by(|a, b| cmp_event_by_start(a, b));
490 Some(events)
491 }
492
493 #[must_use]
495 pub fn service_name(&self, key: ServiceKey) -> Option<&str> {
496 self.cache.get(&key).and_then(|s| s.service_name.as_deref())
497 }
498
499 pub fn services(&self) -> impl Iterator<Item = ServiceKey> + '_ {
503 self.cache.keys().copied()
504 }
505
506 #[must_use]
509 pub fn events(&self, key: ServiceKey) -> Option<Vec<&EpgEvent>> {
510 let svc = self.cache.get(&key)?;
511 let mut events: Vec<&EpgEvent> = svc.events.values().collect();
512 events.sort_by(|a, b| cmp_event_by_start(a, b));
513 Some(events)
514 }
515
516 #[must_use]
518 pub fn service_count(&self) -> usize {
519 self.cache.len()
520 }
521
522 #[must_use]
524 pub fn event_count(&self) -> usize {
525 self.cache.values().map(|s| s.events.len()).sum()
526 }
527
528 pub fn retain_services<F>(&mut self, mut keep: F)
533 where
534 F: FnMut(&ServiceKey) -> bool,
535 {
536 self.cache.retain(|key, _| keep(key));
537 self.collector.retain_logical(|lk| {
538 keep(&ServiceKey {
539 original_network_id: lk.original_network_id,
540 transport_stream_id: lk.transport_stream_id,
541 service_id: lk.service_id,
542 })
543 });
544 }
545
546 pub fn clear(&mut self) {
548 self.collector.clear();
549 self.cache.clear();
550 }
551}
552
553#[cfg(feature = "serde")]
554impl serde::Serialize for EpgStore {
555 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
556 use serde::ser::SerializeMap;
557 let mut m = s.serialize_map(Some(self.cache.len()))?;
558 for (key, svc) in &self.cache {
559 let data = ServiceData {
560 service_name: svc.service_name.clone(),
561 events: {
562 let mut evts: Vec<EpgEvent> = svc.events.values().cloned().collect();
563 evts.sort_by(cmp_event_by_start);
564 evts
565 },
566 };
567 let key_str = format!(
568 "{}-{}-{}",
569 key.original_network_id, key.transport_stream_id, key.service_id
570 );
571 m.serialize_entry(&key_str, &data)?;
572 }
573 m.end()
574 }
575}
576
577fn cmp_event_by_start(a: &EpgEvent, b: &EpgEvent) -> core::cmp::Ordering {
578 match (a.start_time, b.start_time) {
579 (Some(at), Some(bt)) => at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id)),
580 (Some(_), None) => core::cmp::Ordering::Less,
581 (None, Some(_)) => core::cmp::Ordering::Greater,
582 (None, None) => a.event_id.cmp(&b.event_id),
583 }
584}
585
586fn event_to_epg(e: &crate::collect::CompleteEitEvent<'_>) -> EpgEvent {
587 let (event_name, event_text) = extract_short_event(e.descriptors.descriptors());
588 let (extended_text, extended_items) = extract_extended(e.descriptors.descriptors());
589 let content_nibbles = extract_content(e.descriptors.descriptors());
590 let ratings = extract_ratings(e.descriptors.descriptors());
591 let crids = extract_crids(e.descriptors.descriptors());
592
593 EpgEvent {
594 event_id: e.event_id,
595 start_time: e.start_time(),
596 duration: e.duration(),
597 running_status: e.running_status,
598 free_ca_mode: e.free_ca_mode,
599 event_name,
600 event_text,
601 extended_text,
602 extended_items,
603 content_nibbles,
604 ratings,
605 crids,
606 }
607}
608
609fn extract_short_event(
610 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
611) -> (Option<String>, Option<String>) {
612 for desc in descriptors {
613 if let Ok(crate::descriptors::AnyDescriptor::ShortEvent(se)) = desc {
614 return (
615 Some(se.event_name.decode().into_owned()),
616 Some(se.text.decode().into_owned()),
617 );
618 }
619 }
620 (None, None)
621}
622
623struct ExtendedFragment {
624 descriptor_number: u8,
625 text: String,
626 items: Vec<ExtendedItem>,
627}
628
629fn extract_extended(
630 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
631) -> (Option<String>, Vec<ExtendedItem>) {
632 use crate::descriptors::AnyDescriptor;
633
634 let mut fragments: Vec<ExtendedFragment> = descriptors
635 .iter()
636 .filter_map(|d| {
637 if let Ok(AnyDescriptor::ExtendedEvent(ee)) = d {
638 let text = ee.text.decode().into_owned();
639 let items: Vec<ExtendedItem> = ee
640 .items
641 .iter()
642 .map(|i| ExtendedItem {
643 description: i.description.decode().into_owned(),
644 item: i.value.decode().into_owned(),
645 })
646 .collect();
647 if !text.is_empty() || !items.is_empty() {
648 Some(ExtendedFragment {
649 descriptor_number: ee.descriptor_number,
650 text,
651 items,
652 })
653 } else {
654 None
655 }
656 } else {
657 None
658 }
659 })
660 .collect();
661
662 if fragments.is_empty() {
663 return (None, Vec::new());
664 }
665
666 fragments.sort_by_key(|f| f.descriptor_number);
668
669 let extended_text: String = fragments.iter().map(|f| f.text.as_str()).collect();
670
671 let extended_items: Vec<ExtendedItem> = fragments.into_iter().flat_map(|f| f.items).collect();
672
673 let text = if extended_text.is_empty() {
674 None
675 } else {
676 Some(extended_text)
677 };
678
679 (text, extended_items)
680}
681
682fn extract_content(
683 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
684) -> Vec<ContentNibble> {
685 for desc in descriptors {
686 if let Ok(crate::descriptors::AnyDescriptor::Content(ct)) = desc {
687 return ct
688 .entries
689 .iter()
690 .map(|e| ContentNibble {
691 level_1: e.nibble_1,
692 level_2: e.nibble_2,
693 user: e.user_byte,
694 })
695 .collect();
696 }
697 }
698 Vec::new()
699}
700
701fn extract_ratings(
702 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
703) -> Vec<Rating> {
704 for desc in descriptors {
705 if let Ok(crate::descriptors::AnyDescriptor::ParentalRating(pr)) = desc {
706 return pr
707 .entries
708 .iter()
709 .map(|e| Rating {
710 country: e.country_code.as_str().into_owned(),
711 value: e.rating,
712 })
713 .collect();
714 }
715 }
716 Vec::new()
717}
718
719fn extract_crids(
720 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
721) -> Vec<Crid> {
722 use crate::descriptors::content_identifier::CridLocation;
723 for desc in descriptors {
724 if let Ok(crate::descriptors::AnyDescriptor::ContentIdentifier(ci)) = desc {
725 return ci
726 .entries
727 .iter()
728 .filter_map(|e| match e.location {
729 CridLocation::Inline(bytes) => {
730 let s = String::from_utf8_lossy(bytes).into_owned();
731 Some(Crid {
732 crid_type: e.crid_type,
733 crid: s,
734 })
735 }
736 _ => None,
737 })
738 .collect();
739 }
740 }
741 Vec::new()
742}
743
744fn extract_service_name(
745 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
746) -> Option<String> {
747 for desc in descriptors {
748 if let Ok(crate::descriptors::AnyDescriptor::Service(svc)) = desc {
749 return Some(svc.service_name.decode().into_owned());
750 }
751 }
752 None
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758 use chrono::{TimeZone, Utc};
759
760 fn short_event_bytes(name: &[u8], text: &[u8]) -> Vec<u8> {
766 let lang = b"eng";
767 let mut v = Vec::new();
768 v.push(0x4Du8); v.push((3 + 1 + name.len() + 1 + text.len()) as u8); v.extend_from_slice(lang);
771 v.push(name.len() as u8);
772 v.extend_from_slice(name);
773 v.push(text.len() as u8);
774 v.extend_from_slice(text);
775 v
776 }
777
778 #[allow(clippy::too_many_arguments)]
782 fn eit_pf_section(
783 service_id: u16,
784 ts_id: u16,
785 on_id: u16,
786 event_id: u16,
787 version: u8,
788 start_raw: [u8; 5],
789 dur_raw: [u8; 3],
790 descriptors: &[u8],
791 ) -> Vec<u8> {
792 let table_id = 0x4Eu8;
793
794 let ev_len = 12 + descriptors.len();
798 let section_length = 5 + 6 + ev_len + 4;
799 let total = 3 + section_length;
800
801 let mut buf = vec![0u8; total];
802 buf[0] = table_id;
803 buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
804 buf[2] = (section_length & 0xFF) as u8;
805 buf[3..5].copy_from_slice(&service_id.to_be_bytes());
806 buf[5] = 0xC0 | ((version & 0x1F) << 1) | 0x01;
808 buf[6] = 0; buf[7] = 0; buf[8..10].copy_from_slice(&ts_id.to_be_bytes());
811 buf[10..12].copy_from_slice(&on_id.to_be_bytes());
812 buf[12] = 0; buf[13] = 0x5F; let ev_off = 14;
817 buf[ev_off..ev_off + 2].copy_from_slice(&event_id.to_be_bytes());
818 buf[ev_off + 2..ev_off + 7].copy_from_slice(&start_raw);
819 buf[ev_off + 7..ev_off + 10].copy_from_slice(&dur_raw);
820 let dll = descriptors.len() as u16;
821 buf[ev_off + 10] = ((dll >> 8) as u8) & 0x0F;
822 buf[ev_off + 11] = (dll & 0xFF) as u8;
823 buf[ev_off + 12..ev_off + 12 + descriptors.len()].copy_from_slice(descriptors);
824
825 let crc_pos = total - 4;
827 let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_pos]);
828 buf[crc_pos..].copy_from_slice(&crc.to_be_bytes());
829 buf
830 }
831
832 fn start_raw(year: i32, month: u32, day: u32, hour: u32) -> [u8; 5] {
835 let mjd = mjd_approx(year, month, day);
836 let mjd_bytes = mjd.to_be_bytes();
837 let bcd_hour = ((hour / 10 * 16) + (hour % 10)) as u8;
838 [
839 mjd_bytes[0],
840 mjd_bytes[1],
841 bcd_hour,
842 0, 0, ]
845 }
846
847 fn mjd_approx(year: i32, month: u32, day: u32) -> u16 {
849 assert!(
850 (year, month, day) == (2026, 6, 10),
851 "mjd_approx only supports 2026-06-10"
852 );
853 61785
854 }
855
856 fn content_descriptor_bytes(entries: &[(u8, u8, u8)]) -> Vec<u8> {
858 let mut v = vec![0x54u8, (entries.len() * 2) as u8];
859 for &(n1, n2, u) in entries {
860 v.push((n1 << 4) | n2);
861 v.push(u);
862 }
863 v
864 }
865
866 fn parental_rating_bytes(entries: &[([u8; 3], u8)]) -> Vec<u8> {
868 let mut v = vec![0x55u8, (entries.len() * 4) as u8];
869 for (country, rating) in entries {
870 v.extend_from_slice(country);
871 v.push(*rating);
872 }
873 v
874 }
875
876 fn content_identifier_bytes(entries: &[(u8, &[u8])]) -> Vec<u8> {
879 let body_len: usize = entries.iter().map(|(_, data)| 2 + data.len()).sum();
880 let mut v = vec![0x76u8, body_len as u8];
881 for (crid_type, data) in entries {
882 v.push(crid_type << 2); v.push(data.len() as u8);
884 v.extend_from_slice(data);
885 }
886 v
887 }
888
889 #[test]
894 fn new_store_is_empty() {
895 let store = EpgStore::new();
896 assert_eq!(store.service_count(), 0);
897 assert_eq!(store.event_count(), 0);
898 }
899
900 #[test]
901 fn feed_empty_is_error() {
902 let mut store = EpgStore::new();
903 assert!(store.feed(&[]).is_err());
904 }
905
906 #[test]
907 fn now_and_next_no_data_returns_none() {
908 let store = EpgStore::new();
909 let now = Utc::now();
910 let key = ServiceKey {
911 original_network_id: 1,
912 transport_stream_id: 1,
913 service_id: 100,
914 };
915 assert_eq!(store.now_and_next(key, now), (None, None));
916 }
917
918 #[test]
919 fn service_key_ordering() {
920 let a = ServiceKey {
921 original_network_id: 1,
922 transport_stream_id: 2,
923 service_id: 100,
924 };
925 let b = ServiceKey {
926 original_network_id: 1,
927 transport_stream_id: 2,
928 service_id: 200,
929 };
930 assert!(a < b);
931 }
932
933 fn empty_event(
934 id: u16,
935 start: Option<chrono::DateTime<chrono::Utc>>,
936 dur: Option<core::time::Duration>,
937 ) -> EpgEvent {
938 EpgEvent {
939 event_id: id,
940 start_time: start,
941 duration: dur,
942 running_status: RunningStatus::Undefined,
943 free_ca_mode: false,
944 event_name: None,
945 event_text: None,
946 extended_text: None,
947 extended_items: Vec::new(),
948 content_nibbles: Vec::new(),
949 ratings: Vec::new(),
950 crids: Vec::new(),
951 }
952 }
953
954 #[test]
955 fn events_sorts_valid_before_invalid() {
956 let valid = empty_event(
957 1,
958 Some(Utc::now()),
959 Some(core::time::Duration::from_secs(3600)),
960 );
961 let invalid = empty_event(2, None, None);
962
963 let mut events = [&invalid, &valid];
964 events.sort_by(|a, b| cmp_event_by_start(a, b));
965 assert_eq!(events[0].event_id, 1);
966 assert_eq!(events[1].event_id, 2);
967 }
968
969 #[test]
974 fn extended_text_chaining_per_spec_6_2_15() {
975 use crate::descriptors::extended_event::ExtendedEventDescriptor;
976 use crate::descriptors::AnyDescriptor;
977 use crate::text::{DvbText, LangCode};
978
979 let frag1 = ExtendedEventDescriptor {
982 descriptor_number: 2,
983 last_descriptor_number: 3,
984 language_code: LangCode(*b"eng"),
985 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
986 description: DvbText::new(b"Director"),
987 value: DvbText::new(b"Alice"),
988 }],
989 text: DvbText::new(b"The quick "),
990 };
991
992 let frag2 = ExtendedEventDescriptor {
995 descriptor_number: 0,
996 last_descriptor_number: 3,
997 language_code: LangCode(*b"eng"),
998 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
999 description: DvbText::new(b"Year"),
1000 value: DvbText::new(b"2026"),
1001 }],
1002 text: DvbText::new(b"brown fox"),
1003 };
1004
1005 let frag3 = ExtendedEventDescriptor {
1008 descriptor_number: 3,
1009 last_descriptor_number: 3,
1010 language_code: LangCode(*b"eng"),
1011 items: vec![],
1012 text: DvbText::new(b"jumps."),
1013 };
1014
1015 let frag4 = ExtendedEventDescriptor {
1019 descriptor_number: 1,
1020 last_descriptor_number: 3,
1021 language_code: LangCode(*b"eng"),
1022 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
1023 description: DvbText::new(b"Genre"),
1024 value: DvbText::new(b"Thriller"),
1025 }],
1026 text: DvbText::new(b""),
1027 };
1028
1029 let descriptors: Vec<crate::Result<AnyDescriptor<'_>>> = vec![
1031 Ok(AnyDescriptor::ExtendedEvent(frag1)), Ok(AnyDescriptor::ExtendedEvent(frag4)), Ok(AnyDescriptor::ExtendedEvent(frag3)), Ok(AnyDescriptor::ExtendedEvent(frag2)), ];
1036
1037 let (text, items) = extract_extended(&descriptors);
1038
1039 assert_eq!(text.as_deref(), Some("brown foxThe quick jumps."));
1041
1042 assert_eq!(items.len(), 3);
1045 assert_eq!(
1046 items[0],
1047 ExtendedItem {
1048 description: "Year".into(),
1049 item: "2026".into()
1050 }
1051 );
1052 assert_eq!(
1053 items[1],
1054 ExtendedItem {
1055 description: "Genre".into(),
1056 item: "Thriller".into()
1057 }
1058 );
1059 assert_eq!(
1060 items[2],
1061 ExtendedItem {
1062 description: "Director".into(),
1063 item: "Alice".into()
1064 }
1065 );
1066 }
1067
1068 #[test]
1073 fn now_and_next_event_boundary() {
1074 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1075 let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1076 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1077
1078 let sec = core::time::Duration::from_secs(3600);
1081 let ev1 = EpgEvent {
1082 event_id: 1,
1083 start_time: Some(t1000),
1084 duration: Some(sec),
1085 running_status: RunningStatus::Undefined,
1086 free_ca_mode: false,
1087 event_name: Some("Event 1".into()),
1088 event_text: None,
1089 extended_text: None,
1090 extended_items: vec![],
1091 content_nibbles: vec![],
1092 ratings: vec![],
1093 crids: vec![],
1094 };
1095 let ev2 = EpgEvent {
1096 event_id: 2,
1097 start_time: Some(t1200),
1098 duration: Some(sec),
1099 running_status: RunningStatus::Undefined,
1100 free_ca_mode: false,
1101 event_name: Some("Event 2".into()),
1102 event_text: None,
1103 extended_text: None,
1104 extended_items: vec![],
1105 content_nibbles: vec![],
1106 ratings: vec![],
1107 crids: vec![],
1108 };
1109
1110 let mut store = EpgStore::new();
1112 let key = ServiceKey {
1113 original_network_id: 1,
1114 transport_stream_id: 1,
1115 service_id: 100,
1116 };
1117 let svc = store.cache.entry(key).or_default();
1118 svc.events.insert(1, ev1);
1119 svc.events.insert(2, ev2);
1120
1121 let at = Utc.with_ymd_and_hms(2026, 6, 10, 10, 30, 0).unwrap();
1123 let (now, next) = store.now_and_next(key, at);
1124 assert_eq!(now.unwrap().event_id, 1);
1125 assert_eq!(next.unwrap().event_id, 2);
1126
1127 let (now, next) = store.now_and_next(key, t1100);
1130 assert!(now.is_none(), "event ending at query time must NOT be now");
1131 assert_eq!(next.unwrap().event_id, 2);
1132
1133 let (now, next) = store.now_and_next(key, t1200);
1136 assert_eq!(now.unwrap().event_id, 2);
1137 assert!(next.is_none());
1138 }
1139
1140 #[test]
1145 fn now_and_next_returns_earliest_future_event() {
1146 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1152 let t1400 = Utc.with_ymd_and_hms(2026, 6, 10, 14, 0, 0).unwrap();
1153 let t1600 = Utc.with_ymd_and_hms(2026, 6, 10, 16, 0, 0).unwrap();
1154 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1155
1156 let sec = core::time::Duration::from_secs(3600);
1157
1158 fn named_event(
1159 id: u16,
1160 start: chrono::DateTime<chrono::Utc>,
1161 dur: core::time::Duration,
1162 name: &str,
1163 ) -> EpgEvent {
1164 EpgEvent {
1165 event_id: id,
1166 start_time: Some(start),
1167 duration: Some(dur),
1168 running_status: RunningStatus::Undefined,
1169 free_ca_mode: false,
1170 event_name: Some(name.into()),
1171 event_text: None,
1172 extended_text: None,
1173 extended_items: vec![],
1174 content_nibbles: vec![],
1175 ratings: vec![],
1176 crids: vec![],
1177 }
1178 }
1179
1180 let mut store = EpgStore::new();
1181 let key = ServiceKey {
1182 original_network_id: 1,
1183 transport_stream_id: 1,
1184 service_id: 100,
1185 };
1186 let svc = store.cache.entry(key).or_default();
1187 svc.events.insert(3, named_event(3, t1400, sec, "Event 14"));
1189 svc.events.insert(1, named_event(1, t1200, sec, "Event 12"));
1190 svc.events.insert(2, named_event(2, t1600, sec, "Event 16"));
1191
1192 let (_now, next) = store.now_and_next(key, t1000);
1194 let next = next.expect("next event must exist");
1195 assert_eq!(
1196 next.event_id, 1,
1197 "next must be earliest future event (12:00), not first in iteration order"
1198 );
1199 }
1200
1201 #[test]
1206 fn extract_content_ratings_crids_through_feed() {
1207 let content = content_descriptor_bytes(&[(3, 1, 0xAA), (4, 2, 0xBB)]);
1208 let ratings = parental_rating_bytes(&[(*b"FRA", 0x05), (*b"GBR", 0x01)]);
1209 let crids = content_identifier_bytes(&[
1210 (0x01, b"crid://bbc.co.uk/prog123"),
1211 (0x03, b"crid://bbc.co.uk/rec456"),
1212 ]);
1213
1214 let mut descriptors = Vec::new();
1215 descriptors.extend_from_slice(&content);
1216 descriptors.extend_from_slice(&ratings);
1217 descriptors.extend_from_slice(&crids);
1218
1219 let sr = start_raw(2026, 6, 10, 10);
1220 let eit = eit_pf_section(100, 1, 1, 1, 0, sr, [1, 0, 0], &descriptors);
1221
1222 let mut store = EpgStore::new();
1223 store.feed(&eit).unwrap();
1224
1225 let key = ServiceKey {
1226 original_network_id: 1,
1227 transport_stream_id: 1,
1228 service_id: 100,
1229 };
1230 let events = store.events(key).unwrap();
1231 assert_eq!(events.len(), 1);
1232 let ev = &events[0];
1233
1234 assert_eq!(ev.content_nibbles.len(), 2);
1235 assert_eq!(
1236 ev.content_nibbles[0],
1237 ContentNibble {
1238 level_1: 3,
1239 level_2: 1,
1240 user: 0xAA
1241 }
1242 );
1243 assert_eq!(
1244 ev.content_nibbles[1],
1245 ContentNibble {
1246 level_1: 4,
1247 level_2: 2,
1248 user: 0xBB
1249 }
1250 );
1251
1252 assert_eq!(ev.ratings.len(), 2);
1253 assert_eq!(ev.ratings[0].country, "FRA");
1254 assert_eq!(ev.ratings[0].value, 0x05);
1255 assert_eq!(ev.ratings[1].country, "GBR");
1256 assert_eq!(ev.ratings[1].value, 0x01);
1257
1258 assert_eq!(ev.crids.len(), 1 + 1); assert_eq!(ev.crids[0].crid_type, CridType::ItemOfContent);
1260 assert_eq!(ev.crids[0].crid, "crid://bbc.co.uk/prog123");
1261 assert_eq!(ev.crids[1].crid_type, CridType::Recommendation);
1262 assert_eq!(ev.crids[1].crid, "crid://bbc.co.uk/rec456");
1263 }
1264
1265 #[test]
1266 fn extract_service_name_through_feed_sdt() {
1267 use crate::collect::SectionSetCollector;
1268
1269 let svc_desc = {
1271 let provider = b"BBC";
1272 let name = b"BBC ONE HD";
1273 let mut v = vec![0x48u8, (1 + 1 + provider.len() + 1 + name.len()) as u8];
1274 v.push(0x01); v.push(provider.len() as u8);
1276 v.extend_from_slice(provider);
1277 v.push(name.len() as u8);
1278 v.extend_from_slice(name);
1279 v
1280 };
1281
1282 let sdt_bytes = {
1284 let dll = svc_desc.len() as u16;
1285 let svc_entry_len = 5 + dll as usize;
1287 let section_length: u16 = 5 + 3 + svc_entry_len as u16 + 4;
1289 let mut buf = vec![0u8; 3 + section_length as usize];
1290 buf[0] = 0x42; buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
1292 buf[2] = (section_length & 0xFF) as u8;
1293 buf[3..5].copy_from_slice(&1u16.to_be_bytes()); buf[5] = 0xC1; buf[6] = 0; buf[7] = 0; buf[8..10].copy_from_slice(&1u16.to_be_bytes()); buf[10] = 0xFF; let off = 11;
1302 buf[off..off + 2].copy_from_slice(&100u16.to_be_bytes()); buf[off + 2] = 0xFC; buf[off + 3] = ((dll >> 8) as u8) & 0x0F;
1305 buf[off + 4] = (dll & 0xFF) as u8;
1306 buf[off + 5..off + 5 + svc_desc.len()].copy_from_slice(&svc_desc);
1307
1308 let crc_off = buf.len() - 4;
1310 let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_off]);
1311 buf[crc_off..].copy_from_slice(&crc.to_be_bytes());
1312 buf
1313 };
1314
1315 let mut collector = SectionSetCollector::new();
1316 let complete = collector.push_section(&sdt_bytes).unwrap().unwrap();
1317 let sdt = complete.sdt().unwrap();
1318
1319 let mut store = EpgStore::new();
1320 store.feed_sdt(&sdt);
1321
1322 let key = ServiceKey {
1323 original_network_id: 1,
1324 transport_stream_id: 1,
1325 service_id: 100,
1326 };
1327 assert_eq!(store.service_name(key), Some("BBC ONE HD"));
1328 assert_eq!(store.service_count(), 1);
1329 }
1330
1331 #[test]
1336 fn version_churn_bounded_growth() {
1337 let s = |hh: u32| {
1340 let t = Utc.with_ymd_and_hms(2026, 6, 10, hh, 0, 0).unwrap();
1341 let days = 61785u16; let mjd_bytes = days.to_be_bytes();
1343 let bcd_time = [(hh / 10 * 16 + hh % 10) as u8, 0, 0];
1344 (
1345 [
1346 mjd_bytes[0],
1347 mjd_bytes[1],
1348 bcd_time[0],
1349 bcd_time[1],
1350 bcd_time[2],
1351 ],
1352 t,
1353 )
1354 };
1355
1356 let (start1, _) = s(10);
1357 let (start2, _) = s(14);
1358
1359 let desc1 = short_event_bytes(b"News at 10", b"");
1360 let desc2 = short_event_bytes(b"News at 14", b"");
1361
1362 let eit1 = eit_pf_section(100, 1, 1, 1, 0, start1, [1, 0, 0], &desc1);
1363 let eit2 = eit_pf_section(100, 1, 1, 1, 1, start2, [1, 0, 0], &desc2);
1364
1365 let mut store = EpgStore::new();
1366 store.feed(&eit1).unwrap();
1367 assert_eq!(store.event_count(), 1);
1368 store.feed(&eit2).unwrap();
1369 assert_eq!(store.event_count(), 1);
1371
1372 let key = ServiceKey {
1373 original_network_id: 1,
1374 transport_stream_id: 1,
1375 service_id: 100,
1376 };
1377 let evts = store.events(key).unwrap();
1378 assert_eq!(evts.len(), 1);
1379 assert_eq!(evts[0].event_name.as_deref(), Some("News at 14"));
1380 }
1381
1382 #[test]
1387 fn schedule_range_query() {
1388 let t0900 = Utc.with_ymd_and_hms(2026, 6, 10, 9, 0, 0).unwrap();
1389 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1390 let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1391 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1392
1393 let sec = core::time::Duration::from_secs(1800);
1394 let mut store = EpgStore::new();
1395 let key = ServiceKey {
1396 original_network_id: 1,
1397 transport_stream_id: 1,
1398 service_id: 100,
1399 };
1400 let svc = store.cache.entry(key).or_default();
1401 for (id, t) in [(1, t0900), (2, t1000), (3, t1100)] {
1402 svc.events.insert(
1403 id,
1404 EpgEvent {
1405 event_id: id,
1406 start_time: Some(t),
1407 duration: Some(sec),
1408 running_status: RunningStatus::Undefined,
1409 free_ca_mode: false,
1410 event_name: Some(format!("Event {id}")),
1411 event_text: None,
1412 extended_text: None,
1413 extended_items: vec![],
1414 content_nibbles: vec![],
1415 ratings: vec![],
1416 crids: vec![],
1417 },
1418 );
1419 }
1420
1421 let events = store.schedule(key, t1000, t1200).unwrap();
1423 assert_eq!(events.len(), 2);
1424 assert_eq!(events[0].event_id, 2);
1425 assert_eq!(events[1].event_id, 3);
1426
1427 let events = store.schedule(key, t1200, t1100).unwrap();
1429 assert!(events.is_empty());
1430 }
1431
1432 #[test]
1437 fn max_services_capped() {
1438 let mut store = EpgStore::new().with_max_services(2);
1441
1442 let desc = short_event_bytes(b"Test", b"");
1443
1444 let sr1 = start_raw(2026, 6, 10, 10);
1446 let eit1 = eit_pf_section(100, 1, 1, 1, 0, sr1, [1, 0, 0], &desc);
1447 store.feed(&eit1).unwrap();
1448 assert_eq!(store.service_count(), 1);
1449
1450 let sr2 = start_raw(2026, 6, 10, 11);
1452 let eit2 = eit_pf_section(200, 1, 1, 3, 0, sr2, [1, 0, 0], &desc);
1453 store.feed(&eit2).unwrap();
1454 assert_eq!(store.service_count(), 2);
1455
1456 let sr3 = start_raw(2026, 6, 10, 12);
1458 let eit3 = eit_pf_section(300, 1, 1, 5, 0, sr3, [1, 0, 0], &desc);
1459 store.feed(&eit3).unwrap();
1460 assert_eq!(
1461 store.service_count(),
1462 2,
1463 "third service must be rejected when cap is full"
1464 );
1465
1466 let key300 = ServiceKey {
1468 original_network_id: 1,
1469 transport_stream_id: 1,
1470 service_id: 300,
1471 };
1472 assert!(
1473 store.events(key300).is_none(),
1474 "rejected service must not appear"
1475 );
1476
1477 store.clear();
1479 store.feed(&eit3).unwrap();
1480 assert_eq!(store.service_count(), 1);
1481 assert!(store.events(key300).is_some());
1482 }
1483
1484 #[test]
1489 fn max_events_per_service_capped() {
1490 let mut store = EpgStore::new().with_max_events_per_service(3);
1493
1494 let desc = short_event_bytes(b"Test", b"");
1495 let key = ServiceKey {
1496 original_network_id: 1,
1497 transport_stream_id: 1,
1498 service_id: 100,
1499 };
1500
1501 for (version, (event_id, hour)) in [(10, 10u32), (20, 11), (30, 12), (40, 13)]
1502 .iter()
1503 .enumerate()
1504 {
1505 let sr = start_raw(2026, 6, 10, *hour);
1506 let eit = eit_pf_section(100, 1, 1, *event_id, version as u8, sr, [1, 0, 0], &desc);
1507 store.feed(&eit).unwrap();
1508 }
1509
1510 assert_eq!(store.event_count(), 3, "4th event must be skipped at cap 3");
1511
1512 let sr_v2 = start_raw(2026, 6, 10, 15);
1514 let eit_v2 = eit_pf_section(100, 1, 1, 10, 1, sr_v2, [1, 0, 0], &desc);
1515 store.feed(&eit_v2).unwrap();
1516 assert_eq!(
1517 store.event_count(),
1518 3,
1519 "version churn on existing event_id must not increase count"
1520 );
1521
1522 let evts = store.events(key).unwrap();
1523 let ev10 = evts.iter().find(|e| e.event_id == 10).unwrap();
1524 assert_eq!(
1525 ev10.event_name.as_deref(),
1526 Some("Test"),
1527 "existing event updated"
1528 );
1529 }
1530
1531 #[cfg(feature = "serde")]
1536 #[test]
1537 fn serde_serializes_store_as_json() {
1538 let t = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
1539 let mut store = EpgStore::new();
1540 let key = ServiceKey {
1541 original_network_id: 1,
1542 transport_stream_id: 1,
1543 service_id: 100,
1544 };
1545 let svc = store.cache.entry(key).or_default();
1546 svc.service_name = Some("BBC One".into());
1547 svc.events.insert(
1548 1,
1549 EpgEvent {
1550 event_id: 1,
1551 start_time: Some(t),
1552 duration: Some(core::time::Duration::from_secs(3600)),
1553 running_status: RunningStatus::Running,
1554 free_ca_mode: false,
1555 event_name: Some("The News".into()),
1556 event_text: Some("Today's headlines".into()),
1557 extended_text: None,
1558 extended_items: vec![],
1559 content_nibbles: vec![ContentNibble {
1560 level_1: 1,
1561 level_2: 1,
1562 user: 0,
1563 }],
1564 ratings: vec![],
1565 crids: vec![],
1566 },
1567 );
1568
1569 let json = serde_json::to_string(&store).unwrap();
1570 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1571 let svc_data = &v["1-1-100"];
1572 assert_eq!(svc_data["service_name"], "BBC One");
1573 assert_eq!(svc_data["events"][0]["event_name"], "The News");
1574 assert_eq!(
1575 svc_data["events"][0]["content_nibbles"][0],
1576 serde_json::json!({"level_1": 1, "level_2": 1, "user": 0})
1577 );
1578 }
1579}