1use crate::collect::CollectResult;
87use crate::tables::RunningStatus;
88use std::collections::HashMap;
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
92#[cfg_attr(feature = "serde", derive(serde::Serialize))]
93pub struct ServiceKey {
94 pub original_network_id: u16,
96 pub transport_stream_id: u16,
98 pub service_id: u16,
100}
101
102#[non_exhaustive]
105#[derive(Debug, Clone, PartialEq, Eq)]
106#[cfg_attr(feature = "serde", derive(serde::Serialize))]
107pub struct Rating {
108 pub country: String,
110 pub value: u8,
112}
113
114impl Rating {
115 #[must_use]
117 pub fn minimum_age(&self) -> Option<u8> {
118 match self.value {
119 0x01..=0x0F => Some(self.value + 3),
120 _ => None,
121 }
122 }
123}
124
125pub use crate::descriptors::content_identifier::CridType;
129
130#[non_exhaustive]
133#[derive(Debug, Clone, PartialEq, Eq)]
134#[cfg_attr(feature = "serde", derive(serde::Serialize))]
135pub struct Crid {
136 pub crid_type: CridType,
138 pub crid: String,
140}
141
142#[non_exhaustive]
144#[derive(Debug, Clone, PartialEq, Eq)]
145#[cfg_attr(feature = "serde", derive(serde::Serialize))]
146pub struct ExtendedItem {
147 pub description: String,
149 pub item: String,
151}
152
153#[non_exhaustive]
156#[derive(Debug, Clone, PartialEq, Eq)]
157#[cfg_attr(feature = "serde", derive(serde::Serialize))]
158pub struct ContentNibble {
159 pub level_1: u8,
161 pub level_2: u8,
163 pub user: u8,
165}
166
167impl ContentNibble {
168 #[must_use]
170 pub fn genre(&self) -> crate::descriptors::content::ContentGenre {
171 crate::descriptors::content::ContentGenre::from_nibble_1(self.level_1)
172 }
173
174 #[must_use]
176 pub fn genre_name(&self) -> &'static str {
177 crate::descriptors::content::content_genre_name(self.level_1, self.level_2)
178 }
179}
180
181#[non_exhaustive]
193#[derive(Debug, Clone, PartialEq, Eq)]
194#[cfg_attr(feature = "serde", derive(serde::Serialize))]
195pub struct EpgEvent {
196 pub event_id: u16,
198 pub start_time: Option<chrono::DateTime<chrono::Utc>>,
200 pub duration: Option<core::time::Duration>,
202 pub running_status: RunningStatus,
204 pub free_ca_mode: bool,
206 pub event_name: Option<String>,
210 pub event_text: Option<String>,
212 pub extended_text: Option<String>,
217 #[cfg_attr(feature = "serde", serde(default))]
221 pub extended_items: Vec<ExtendedItem>,
222 #[cfg_attr(feature = "serde", serde(default))]
225 pub content_nibbles: Vec<ContentNibble>,
226 #[cfg_attr(feature = "serde", serde(default))]
229 pub ratings: Vec<Rating>,
230 #[cfg_attr(feature = "serde", serde(default))]
233 pub crids: Vec<Crid>,
234}
235
236#[derive(Debug, Clone)]
238#[cfg_attr(feature = "serde", derive(serde::Serialize))]
239struct ServiceData {
240 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
241 service_name: Option<String>,
242 events: Vec<EpgEvent>,
243}
244
245pub const DEFAULT_MAX_SERVICES: usize = 1024;
251
252pub const DEFAULT_MAX_EVENTS_PER_SERVICE: usize = 8192;
258
259#[derive(Debug)]
301pub struct EpgStore {
302 collector: crate::collect::EitCollector,
303 cache: HashMap<ServiceKey, ServiceEpg>,
304 max_services: usize,
305 max_events_per_service: usize,
306}
307
308impl Default for EpgStore {
309 fn default() -> Self {
310 Self {
311 collector: crate::collect::EitCollector::default(),
312 cache: HashMap::new(),
313 max_services: DEFAULT_MAX_SERVICES,
314 max_events_per_service: DEFAULT_MAX_EVENTS_PER_SERVICE,
315 }
316 }
317}
318
319#[derive(Debug, Default)]
320struct ServiceEpg {
321 service_name: Option<String>,
322 events: HashMap<u16, EpgEvent>,
324}
325
326impl EpgStore {
327 #[must_use]
330 pub fn new() -> Self {
331 Self::default()
332 }
333
334 #[must_use]
339 pub fn with_max_services(mut self, max_services: usize) -> Self {
340 self.max_services = max_services;
341 self
342 }
343
344 #[must_use]
349 pub fn with_max_events_per_service(mut self, max_events_per_service: usize) -> Self {
350 self.max_events_per_service = max_events_per_service;
351 self
352 }
353
354 #[must_use]
358 pub fn with_collector_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
359 self.collector = self.collector.with_max_logical_keys(max_logical_keys);
360 self
361 }
362
363 pub fn feed(&mut self, bytes: &[u8]) -> CollectResult<()> {
372 self.feed_with_pid(None, bytes)
373 }
374
375 pub fn feed_with_pid(&mut self, pid: Option<u16>, bytes: &[u8]) -> CollectResult<()> {
377 if let Some(completed) = self.collector.push_section_with_pid(pid, bytes)? {
378 let tables = completed.tables()?;
379 for table in &tables {
380 let key = ServiceKey {
381 original_network_id: table.original_network_id,
382 transport_stream_id: table.transport_stream_id,
383 service_id: table.service_id,
384 };
385 if self.cache.len() >= self.max_services && !self.cache.contains_key(&key) {
386 continue;
387 }
388 let svc = self.cache.entry(key).or_default();
389 for event in &table.events {
390 if svc.events.len() >= self.max_events_per_service
391 && !svc.events.contains_key(&event.event_id)
392 {
393 continue;
394 }
395 svc.events.insert(event.event_id, event_to_epg(event));
396 }
397 }
398 }
399 Ok(())
400 }
401
402 pub fn feed_sdt(&mut self, sdt: &crate::collect::CompleteSdt<'_>) {
407 for svc in &sdt.services {
408 let key = ServiceKey {
409 original_network_id: sdt.original_network_id,
410 transport_stream_id: sdt.transport_stream_id,
411 service_id: svc.service_id,
412 };
413 let entry = self.cache.entry(key).or_default();
414 entry.service_name = extract_service_name(svc.descriptors.descriptors());
415 }
416 }
417
418 pub fn now_and_next(
432 &self,
433 key: ServiceKey,
434 at: chrono::DateTime<chrono::Utc>,
435 ) -> (Option<&EpgEvent>, Option<&EpgEvent>) {
436 let Some(svc) = self.cache.get(&key) else {
437 return (None, None);
438 };
439
440 let now = svc.events.values().find(|e| {
441 if let (Some(start), Some(dur)) = (e.start_time, e.duration) {
442 let end = start + dur;
443 return at >= start && at < end;
444 }
445 false
446 });
447
448 let next = svc
449 .events
450 .values()
451 .filter(|e| {
452 if let Some(start) = e.start_time {
453 start > at
454 } else {
455 false
456 }
457 })
458 .min_by_key(|e| e.start_time);
459
460 (now, next)
461 }
462
463 #[must_use]
468 pub fn schedule(
469 &self,
470 key: ServiceKey,
471 from: chrono::DateTime<chrono::Utc>,
472 to: chrono::DateTime<chrono::Utc>,
473 ) -> Option<Vec<&EpgEvent>> {
474 let svc = self.cache.get(&key)?;
475 let mut events: Vec<&EpgEvent> = svc
476 .events
477 .values()
478 .filter(|e| {
479 if let Some(start) = e.start_time {
480 start >= from && start < to
481 } else {
482 false
483 }
484 })
485 .collect();
486 events.sort_by(|a, b| cmp_event_by_start(a, b));
487 Some(events)
488 }
489
490 #[must_use]
492 pub fn service_name(&self, key: ServiceKey) -> Option<&str> {
493 self.cache.get(&key).and_then(|s| s.service_name.as_deref())
494 }
495
496 pub fn services(&self) -> impl Iterator<Item = ServiceKey> + '_ {
500 self.cache.keys().copied()
501 }
502
503 #[must_use]
506 pub fn events(&self, key: ServiceKey) -> Option<Vec<&EpgEvent>> {
507 let svc = self.cache.get(&key)?;
508 let mut events: Vec<&EpgEvent> = svc.events.values().collect();
509 events.sort_by(|a, b| cmp_event_by_start(a, b));
510 Some(events)
511 }
512
513 #[must_use]
515 pub fn service_count(&self) -> usize {
516 self.cache.len()
517 }
518
519 #[must_use]
521 pub fn event_count(&self) -> usize {
522 self.cache.values().map(|s| s.events.len()).sum()
523 }
524
525 pub fn retain_services<F>(&mut self, mut keep: F)
530 where
531 F: FnMut(&ServiceKey) -> bool,
532 {
533 self.cache.retain(|key, _| keep(key));
534 self.collector.retain_logical(|lk| {
535 keep(&ServiceKey {
536 original_network_id: lk.original_network_id,
537 transport_stream_id: lk.transport_stream_id,
538 service_id: lk.service_id,
539 })
540 });
541 }
542
543 pub fn clear(&mut self) {
545 self.collector.clear();
546 self.cache.clear();
547 }
548}
549
550#[cfg(feature = "serde")]
551impl serde::Serialize for EpgStore {
552 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
553 use serde::ser::SerializeMap;
554 let mut m = s.serialize_map(Some(self.cache.len()))?;
555 for (key, svc) in &self.cache {
556 let data = ServiceData {
557 service_name: svc.service_name.clone(),
558 events: {
559 let mut evts: Vec<EpgEvent> = svc.events.values().cloned().collect();
560 evts.sort_by(cmp_event_by_start);
561 evts
562 },
563 };
564 let key_str = format!(
565 "{}-{}-{}",
566 key.original_network_id, key.transport_stream_id, key.service_id
567 );
568 m.serialize_entry(&key_str, &data)?;
569 }
570 m.end()
571 }
572}
573
574fn cmp_event_by_start(a: &EpgEvent, b: &EpgEvent) -> std::cmp::Ordering {
575 match (a.start_time, b.start_time) {
576 (Some(at), Some(bt)) => at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id)),
577 (Some(_), None) => std::cmp::Ordering::Less,
578 (None, Some(_)) => std::cmp::Ordering::Greater,
579 (None, None) => a.event_id.cmp(&b.event_id),
580 }
581}
582
583fn event_to_epg(e: &crate::collect::CompleteEitEvent<'_>) -> EpgEvent {
584 let (event_name, event_text) = extract_short_event(e.descriptors.descriptors());
585 let (extended_text, extended_items) = extract_extended(e.descriptors.descriptors());
586 let content_nibbles = extract_content(e.descriptors.descriptors());
587 let ratings = extract_ratings(e.descriptors.descriptors());
588 let crids = extract_crids(e.descriptors.descriptors());
589
590 EpgEvent {
591 event_id: e.event_id,
592 start_time: e.start_time(),
593 duration: e.duration(),
594 running_status: e.running_status,
595 free_ca_mode: e.free_ca_mode,
596 event_name,
597 event_text,
598 extended_text,
599 extended_items,
600 content_nibbles,
601 ratings,
602 crids,
603 }
604}
605
606fn extract_short_event(
607 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
608) -> (Option<String>, Option<String>) {
609 for desc in descriptors {
610 if let Ok(crate::descriptors::AnyDescriptor::ShortEvent(se)) = desc {
611 return (
612 Some(se.event_name.decode().into_owned()),
613 Some(se.text.decode().into_owned()),
614 );
615 }
616 }
617 (None, None)
618}
619
620struct ExtendedFragment {
621 descriptor_number: u8,
622 text: String,
623 items: Vec<ExtendedItem>,
624}
625
626fn extract_extended(
627 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
628) -> (Option<String>, Vec<ExtendedItem>) {
629 use crate::descriptors::AnyDescriptor;
630
631 let mut fragments: Vec<ExtendedFragment> = descriptors
632 .iter()
633 .filter_map(|d| {
634 if let Ok(AnyDescriptor::ExtendedEvent(ee)) = d {
635 let text = ee.text.decode().into_owned();
636 let items: Vec<ExtendedItem> = ee
637 .items
638 .iter()
639 .map(|i| ExtendedItem {
640 description: i.description.decode().into_owned(),
641 item: i.value.decode().into_owned(),
642 })
643 .collect();
644 if !text.is_empty() || !items.is_empty() {
645 Some(ExtendedFragment {
646 descriptor_number: ee.descriptor_number,
647 text,
648 items,
649 })
650 } else {
651 None
652 }
653 } else {
654 None
655 }
656 })
657 .collect();
658
659 if fragments.is_empty() {
660 return (None, Vec::new());
661 }
662
663 fragments.sort_by_key(|f| f.descriptor_number);
665
666 let extended_text: String = fragments.iter().map(|f| f.text.as_str()).collect();
667
668 let extended_items: Vec<ExtendedItem> = fragments.into_iter().flat_map(|f| f.items).collect();
669
670 let text = if extended_text.is_empty() {
671 None
672 } else {
673 Some(extended_text)
674 };
675
676 (text, extended_items)
677}
678
679fn extract_content(
680 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
681) -> Vec<ContentNibble> {
682 for desc in descriptors {
683 if let Ok(crate::descriptors::AnyDescriptor::Content(ct)) = desc {
684 return ct
685 .entries
686 .iter()
687 .map(|e| ContentNibble {
688 level_1: e.nibble_1,
689 level_2: e.nibble_2,
690 user: e.user_byte,
691 })
692 .collect();
693 }
694 }
695 Vec::new()
696}
697
698fn extract_ratings(
699 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
700) -> Vec<Rating> {
701 for desc in descriptors {
702 if let Ok(crate::descriptors::AnyDescriptor::ParentalRating(pr)) = desc {
703 return pr
704 .entries
705 .iter()
706 .map(|e| Rating {
707 country: e.country_code.as_str().into_owned(),
708 value: e.rating,
709 })
710 .collect();
711 }
712 }
713 Vec::new()
714}
715
716fn extract_crids(
717 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
718) -> Vec<Crid> {
719 use crate::descriptors::content_identifier::CridLocation;
720 for desc in descriptors {
721 if let Ok(crate::descriptors::AnyDescriptor::ContentIdentifier(ci)) = desc {
722 return ci
723 .entries
724 .iter()
725 .filter_map(|e| match e.location {
726 CridLocation::Inline(bytes) => {
727 let s = String::from_utf8_lossy(bytes).into_owned();
728 Some(Crid {
729 crid_type: e.crid_type,
730 crid: s,
731 })
732 }
733 _ => None,
734 })
735 .collect();
736 }
737 }
738 Vec::new()
739}
740
741fn extract_service_name(
742 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
743) -> Option<String> {
744 for desc in descriptors {
745 if let Ok(crate::descriptors::AnyDescriptor::Service(svc)) = desc {
746 return Some(svc.service_name.decode().into_owned());
747 }
748 }
749 None
750}
751
752#[cfg(test)]
753mod tests {
754 use super::*;
755 use chrono::{TimeZone, Utc};
756
757 fn short_event_bytes(name: &[u8], text: &[u8]) -> Vec<u8> {
763 let lang = b"eng";
764 let mut v = Vec::new();
765 v.push(0x4Du8); v.push((3 + 1 + name.len() + 1 + text.len()) as u8); v.extend_from_slice(lang);
768 v.push(name.len() as u8);
769 v.extend_from_slice(name);
770 v.push(text.len() as u8);
771 v.extend_from_slice(text);
772 v
773 }
774
775 #[allow(clippy::too_many_arguments)]
779 fn eit_pf_section(
780 service_id: u16,
781 ts_id: u16,
782 on_id: u16,
783 event_id: u16,
784 version: u8,
785 start_raw: [u8; 5],
786 dur_raw: [u8; 3],
787 descriptors: &[u8],
788 ) -> Vec<u8> {
789 let table_id = 0x4Eu8;
790
791 let ev_len = 12 + descriptors.len();
795 let section_length = 5 + 6 + ev_len + 4;
796 let total = 3 + section_length;
797
798 let mut buf = vec![0u8; total];
799 buf[0] = table_id;
800 buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
801 buf[2] = (section_length & 0xFF) as u8;
802 buf[3..5].copy_from_slice(&service_id.to_be_bytes());
803 buf[5] = 0xC0 | ((version & 0x1F) << 1) | 0x01;
805 buf[6] = 0; buf[7] = 0; buf[8..10].copy_from_slice(&ts_id.to_be_bytes());
808 buf[10..12].copy_from_slice(&on_id.to_be_bytes());
809 buf[12] = 0; buf[13] = 0x5F; let ev_off = 14;
814 buf[ev_off..ev_off + 2].copy_from_slice(&event_id.to_be_bytes());
815 buf[ev_off + 2..ev_off + 7].copy_from_slice(&start_raw);
816 buf[ev_off + 7..ev_off + 10].copy_from_slice(&dur_raw);
817 let dll = descriptors.len() as u16;
818 buf[ev_off + 10] = ((dll >> 8) as u8) & 0x0F;
819 buf[ev_off + 11] = (dll & 0xFF) as u8;
820 buf[ev_off + 12..ev_off + 12 + descriptors.len()].copy_from_slice(descriptors);
821
822 let crc_pos = total - 4;
824 let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_pos]);
825 buf[crc_pos..].copy_from_slice(&crc.to_be_bytes());
826 buf
827 }
828
829 fn start_raw(year: i32, month: u32, day: u32, hour: u32) -> [u8; 5] {
832 let mjd = mjd_approx(year, month, day);
833 let mjd_bytes = mjd.to_be_bytes();
834 let bcd_hour = ((hour / 10 * 16) + (hour % 10)) as u8;
835 [
836 mjd_bytes[0],
837 mjd_bytes[1],
838 bcd_hour,
839 0, 0, ]
842 }
843
844 fn mjd_approx(year: i32, month: u32, day: u32) -> u16 {
846 assert!(
847 (year, month, day) == (2026, 6, 10),
848 "mjd_approx only supports 2026-06-10"
849 );
850 61785
851 }
852
853 fn content_descriptor_bytes(entries: &[(u8, u8, u8)]) -> Vec<u8> {
855 let mut v = vec![0x54u8, (entries.len() * 2) as u8];
856 for &(n1, n2, u) in entries {
857 v.push((n1 << 4) | n2);
858 v.push(u);
859 }
860 v
861 }
862
863 fn parental_rating_bytes(entries: &[([u8; 3], u8)]) -> Vec<u8> {
865 let mut v = vec![0x55u8, (entries.len() * 4) as u8];
866 for (country, rating) in entries {
867 v.extend_from_slice(country);
868 v.push(*rating);
869 }
870 v
871 }
872
873 fn content_identifier_bytes(entries: &[(u8, &[u8])]) -> Vec<u8> {
876 let body_len: usize = entries.iter().map(|(_, data)| 2 + data.len()).sum();
877 let mut v = vec![0x76u8, body_len as u8];
878 for (crid_type, data) in entries {
879 v.push(crid_type << 2); v.push(data.len() as u8);
881 v.extend_from_slice(data);
882 }
883 v
884 }
885
886 #[test]
891 fn new_store_is_empty() {
892 let store = EpgStore::new();
893 assert_eq!(store.service_count(), 0);
894 assert_eq!(store.event_count(), 0);
895 }
896
897 #[test]
898 fn feed_empty_is_error() {
899 let mut store = EpgStore::new();
900 assert!(store.feed(&[]).is_err());
901 }
902
903 #[test]
904 fn now_and_next_no_data_returns_none() {
905 let store = EpgStore::new();
906 let now = Utc::now();
907 let key = ServiceKey {
908 original_network_id: 1,
909 transport_stream_id: 1,
910 service_id: 100,
911 };
912 assert_eq!(store.now_and_next(key, now), (None, None));
913 }
914
915 #[test]
916 fn service_key_ordering() {
917 let a = ServiceKey {
918 original_network_id: 1,
919 transport_stream_id: 2,
920 service_id: 100,
921 };
922 let b = ServiceKey {
923 original_network_id: 1,
924 transport_stream_id: 2,
925 service_id: 200,
926 };
927 assert!(a < b);
928 }
929
930 fn empty_event(
931 id: u16,
932 start: Option<chrono::DateTime<chrono::Utc>>,
933 dur: Option<core::time::Duration>,
934 ) -> EpgEvent {
935 EpgEvent {
936 event_id: id,
937 start_time: start,
938 duration: dur,
939 running_status: RunningStatus::Undefined,
940 free_ca_mode: false,
941 event_name: None,
942 event_text: None,
943 extended_text: None,
944 extended_items: Vec::new(),
945 content_nibbles: Vec::new(),
946 ratings: Vec::new(),
947 crids: Vec::new(),
948 }
949 }
950
951 #[test]
952 fn events_sorts_valid_before_invalid() {
953 let valid = empty_event(
954 1,
955 Some(Utc::now()),
956 Some(core::time::Duration::from_secs(3600)),
957 );
958 let invalid = empty_event(2, None, None);
959
960 let mut events = [&invalid, &valid];
961 events.sort_by(|a, b| cmp_event_by_start(a, b));
962 assert_eq!(events[0].event_id, 1);
963 assert_eq!(events[1].event_id, 2);
964 }
965
966 #[test]
971 fn extended_text_chaining_per_spec_6_2_15() {
972 use crate::descriptors::extended_event::ExtendedEventDescriptor;
973 use crate::descriptors::AnyDescriptor;
974 use crate::text::{DvbText, LangCode};
975
976 let frag1 = ExtendedEventDescriptor {
979 descriptor_number: 2,
980 last_descriptor_number: 3,
981 language_code: LangCode(*b"eng"),
982 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
983 description: DvbText::new(b"Director"),
984 value: DvbText::new(b"Alice"),
985 }],
986 text: DvbText::new(b"The quick "),
987 };
988
989 let frag2 = ExtendedEventDescriptor {
992 descriptor_number: 0,
993 last_descriptor_number: 3,
994 language_code: LangCode(*b"eng"),
995 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
996 description: DvbText::new(b"Year"),
997 value: DvbText::new(b"2026"),
998 }],
999 text: DvbText::new(b"brown fox"),
1000 };
1001
1002 let frag3 = ExtendedEventDescriptor {
1005 descriptor_number: 3,
1006 last_descriptor_number: 3,
1007 language_code: LangCode(*b"eng"),
1008 items: vec![],
1009 text: DvbText::new(b"jumps."),
1010 };
1011
1012 let frag4 = ExtendedEventDescriptor {
1016 descriptor_number: 1,
1017 last_descriptor_number: 3,
1018 language_code: LangCode(*b"eng"),
1019 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
1020 description: DvbText::new(b"Genre"),
1021 value: DvbText::new(b"Thriller"),
1022 }],
1023 text: DvbText::new(b""),
1024 };
1025
1026 let descriptors: Vec<crate::Result<AnyDescriptor<'_>>> = vec![
1028 Ok(AnyDescriptor::ExtendedEvent(frag1)), Ok(AnyDescriptor::ExtendedEvent(frag4)), Ok(AnyDescriptor::ExtendedEvent(frag3)), Ok(AnyDescriptor::ExtendedEvent(frag2)), ];
1033
1034 let (text, items) = extract_extended(&descriptors);
1035
1036 assert_eq!(text.as_deref(), Some("brown foxThe quick jumps."));
1038
1039 assert_eq!(items.len(), 3);
1042 assert_eq!(
1043 items[0],
1044 ExtendedItem {
1045 description: "Year".into(),
1046 item: "2026".into()
1047 }
1048 );
1049 assert_eq!(
1050 items[1],
1051 ExtendedItem {
1052 description: "Genre".into(),
1053 item: "Thriller".into()
1054 }
1055 );
1056 assert_eq!(
1057 items[2],
1058 ExtendedItem {
1059 description: "Director".into(),
1060 item: "Alice".into()
1061 }
1062 );
1063 }
1064
1065 #[test]
1070 fn now_and_next_event_boundary() {
1071 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1072 let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1073 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1074
1075 let sec = core::time::Duration::from_secs(3600);
1078 let ev1 = EpgEvent {
1079 event_id: 1,
1080 start_time: Some(t1000),
1081 duration: Some(sec),
1082 running_status: RunningStatus::Undefined,
1083 free_ca_mode: false,
1084 event_name: Some("Event 1".into()),
1085 event_text: None,
1086 extended_text: None,
1087 extended_items: vec![],
1088 content_nibbles: vec![],
1089 ratings: vec![],
1090 crids: vec![],
1091 };
1092 let ev2 = EpgEvent {
1093 event_id: 2,
1094 start_time: Some(t1200),
1095 duration: Some(sec),
1096 running_status: RunningStatus::Undefined,
1097 free_ca_mode: false,
1098 event_name: Some("Event 2".into()),
1099 event_text: None,
1100 extended_text: None,
1101 extended_items: vec![],
1102 content_nibbles: vec![],
1103 ratings: vec![],
1104 crids: vec![],
1105 };
1106
1107 let mut store = EpgStore::new();
1109 let key = ServiceKey {
1110 original_network_id: 1,
1111 transport_stream_id: 1,
1112 service_id: 100,
1113 };
1114 let svc = store.cache.entry(key).or_default();
1115 svc.events.insert(1, ev1);
1116 svc.events.insert(2, ev2);
1117
1118 let at = Utc.with_ymd_and_hms(2026, 6, 10, 10, 30, 0).unwrap();
1120 let (now, next) = store.now_and_next(key, at);
1121 assert_eq!(now.unwrap().event_id, 1);
1122 assert_eq!(next.unwrap().event_id, 2);
1123
1124 let (now, next) = store.now_and_next(key, t1100);
1127 assert!(now.is_none(), "event ending at query time must NOT be now");
1128 assert_eq!(next.unwrap().event_id, 2);
1129
1130 let (now, next) = store.now_and_next(key, t1200);
1133 assert_eq!(now.unwrap().event_id, 2);
1134 assert!(next.is_none());
1135 }
1136
1137 #[test]
1142 fn now_and_next_returns_earliest_future_event() {
1143 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1149 let t1400 = Utc.with_ymd_and_hms(2026, 6, 10, 14, 0, 0).unwrap();
1150 let t1600 = Utc.with_ymd_and_hms(2026, 6, 10, 16, 0, 0).unwrap();
1151 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1152
1153 let sec = core::time::Duration::from_secs(3600);
1154
1155 fn named_event(
1156 id: u16,
1157 start: chrono::DateTime<chrono::Utc>,
1158 dur: core::time::Duration,
1159 name: &str,
1160 ) -> EpgEvent {
1161 EpgEvent {
1162 event_id: id,
1163 start_time: Some(start),
1164 duration: Some(dur),
1165 running_status: RunningStatus::Undefined,
1166 free_ca_mode: false,
1167 event_name: Some(name.into()),
1168 event_text: None,
1169 extended_text: None,
1170 extended_items: vec![],
1171 content_nibbles: vec![],
1172 ratings: vec![],
1173 crids: vec![],
1174 }
1175 }
1176
1177 let mut store = EpgStore::new();
1178 let key = ServiceKey {
1179 original_network_id: 1,
1180 transport_stream_id: 1,
1181 service_id: 100,
1182 };
1183 let svc = store.cache.entry(key).or_default();
1184 svc.events.insert(3, named_event(3, t1400, sec, "Event 14"));
1186 svc.events.insert(1, named_event(1, t1200, sec, "Event 12"));
1187 svc.events.insert(2, named_event(2, t1600, sec, "Event 16"));
1188
1189 let (_now, next) = store.now_and_next(key, t1000);
1191 let next = next.expect("next event must exist");
1192 assert_eq!(
1193 next.event_id, 1,
1194 "next must be earliest future event (12:00), not first in iteration order"
1195 );
1196 }
1197
1198 #[test]
1203 fn extract_content_ratings_crids_through_feed() {
1204 let content = content_descriptor_bytes(&[(3, 1, 0xAA), (4, 2, 0xBB)]);
1205 let ratings = parental_rating_bytes(&[(*b"FRA", 0x05), (*b"GBR", 0x01)]);
1206 let crids = content_identifier_bytes(&[
1207 (0x01, b"crid://bbc.co.uk/prog123"),
1208 (0x03, b"crid://bbc.co.uk/rec456"),
1209 ]);
1210
1211 let mut descriptors = Vec::new();
1212 descriptors.extend_from_slice(&content);
1213 descriptors.extend_from_slice(&ratings);
1214 descriptors.extend_from_slice(&crids);
1215
1216 let sr = start_raw(2026, 6, 10, 10);
1217 let eit = eit_pf_section(100, 1, 1, 1, 0, sr, [1, 0, 0], &descriptors);
1218
1219 let mut store = EpgStore::new();
1220 store.feed(&eit).unwrap();
1221
1222 let key = ServiceKey {
1223 original_network_id: 1,
1224 transport_stream_id: 1,
1225 service_id: 100,
1226 };
1227 let events = store.events(key).unwrap();
1228 assert_eq!(events.len(), 1);
1229 let ev = &events[0];
1230
1231 assert_eq!(ev.content_nibbles.len(), 2);
1232 assert_eq!(
1233 ev.content_nibbles[0],
1234 ContentNibble {
1235 level_1: 3,
1236 level_2: 1,
1237 user: 0xAA
1238 }
1239 );
1240 assert_eq!(
1241 ev.content_nibbles[1],
1242 ContentNibble {
1243 level_1: 4,
1244 level_2: 2,
1245 user: 0xBB
1246 }
1247 );
1248
1249 assert_eq!(ev.ratings.len(), 2);
1250 assert_eq!(ev.ratings[0].country, "FRA");
1251 assert_eq!(ev.ratings[0].value, 0x05);
1252 assert_eq!(ev.ratings[1].country, "GBR");
1253 assert_eq!(ev.ratings[1].value, 0x01);
1254
1255 assert_eq!(ev.crids.len(), 1 + 1); assert_eq!(ev.crids[0].crid_type, CridType::ItemOfContent);
1257 assert_eq!(ev.crids[0].crid, "crid://bbc.co.uk/prog123");
1258 assert_eq!(ev.crids[1].crid_type, CridType::Recommendation);
1259 assert_eq!(ev.crids[1].crid, "crid://bbc.co.uk/rec456");
1260 }
1261
1262 #[test]
1263 fn extract_service_name_through_feed_sdt() {
1264 use crate::collect::SectionSetCollector;
1265
1266 let svc_desc = {
1268 let provider = b"BBC";
1269 let name = b"BBC ONE HD";
1270 let mut v = vec![0x48u8, (1 + 1 + provider.len() + 1 + name.len()) as u8];
1271 v.push(0x01); v.push(provider.len() as u8);
1273 v.extend_from_slice(provider);
1274 v.push(name.len() as u8);
1275 v.extend_from_slice(name);
1276 v
1277 };
1278
1279 let sdt_bytes = {
1281 let dll = svc_desc.len() as u16;
1282 let svc_entry_len = 5 + dll as usize;
1284 let section_length: u16 = 5 + 3 + svc_entry_len as u16 + 4;
1286 let mut buf = vec![0u8; 3 + section_length as usize];
1287 buf[0] = 0x42; buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
1289 buf[2] = (section_length & 0xFF) as u8;
1290 buf[3..5].copy_from_slice(&1u16.to_be_bytes()); buf[5] = 0xC1; buf[6] = 0; buf[7] = 0; buf[8..10].copy_from_slice(&1u16.to_be_bytes()); buf[10] = 0xFF; let off = 11;
1299 buf[off..off + 2].copy_from_slice(&100u16.to_be_bytes()); buf[off + 2] = 0xFC; buf[off + 3] = ((dll >> 8) as u8) & 0x0F;
1302 buf[off + 4] = (dll & 0xFF) as u8;
1303 buf[off + 5..off + 5 + svc_desc.len()].copy_from_slice(&svc_desc);
1304
1305 let crc_off = buf.len() - 4;
1307 let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_off]);
1308 buf[crc_off..].copy_from_slice(&crc.to_be_bytes());
1309 buf
1310 };
1311
1312 let mut collector = SectionSetCollector::new();
1313 let complete = collector.push_section(&sdt_bytes).unwrap().unwrap();
1314 let sdt = complete.sdt().unwrap();
1315
1316 let mut store = EpgStore::new();
1317 store.feed_sdt(&sdt);
1318
1319 let key = ServiceKey {
1320 original_network_id: 1,
1321 transport_stream_id: 1,
1322 service_id: 100,
1323 };
1324 assert_eq!(store.service_name(key), Some("BBC ONE HD"));
1325 assert_eq!(store.service_count(), 1);
1326 }
1327
1328 #[test]
1333 fn version_churn_bounded_growth() {
1334 let s = |hh: u32| {
1337 let t = Utc.with_ymd_and_hms(2026, 6, 10, hh, 0, 0).unwrap();
1338 let days = 61785u16; let mjd_bytes = days.to_be_bytes();
1340 let bcd_time = [(hh / 10 * 16 + hh % 10) as u8, 0, 0];
1341 (
1342 [
1343 mjd_bytes[0],
1344 mjd_bytes[1],
1345 bcd_time[0],
1346 bcd_time[1],
1347 bcd_time[2],
1348 ],
1349 t,
1350 )
1351 };
1352
1353 let (start1, _) = s(10);
1354 let (start2, _) = s(14);
1355
1356 let desc1 = short_event_bytes(b"News at 10", b"");
1357 let desc2 = short_event_bytes(b"News at 14", b"");
1358
1359 let eit1 = eit_pf_section(100, 1, 1, 1, 0, start1, [1, 0, 0], &desc1);
1360 let eit2 = eit_pf_section(100, 1, 1, 1, 1, start2, [1, 0, 0], &desc2);
1361
1362 let mut store = EpgStore::new();
1363 store.feed(&eit1).unwrap();
1364 assert_eq!(store.event_count(), 1);
1365 store.feed(&eit2).unwrap();
1366 assert_eq!(store.event_count(), 1);
1368
1369 let key = ServiceKey {
1370 original_network_id: 1,
1371 transport_stream_id: 1,
1372 service_id: 100,
1373 };
1374 let evts = store.events(key).unwrap();
1375 assert_eq!(evts.len(), 1);
1376 assert_eq!(evts[0].event_name.as_deref(), Some("News at 14"));
1377 }
1378
1379 #[test]
1384 fn schedule_range_query() {
1385 let t0900 = Utc.with_ymd_and_hms(2026, 6, 10, 9, 0, 0).unwrap();
1386 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1387 let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1388 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1389
1390 let sec = core::time::Duration::from_secs(1800);
1391 let mut store = EpgStore::new();
1392 let key = ServiceKey {
1393 original_network_id: 1,
1394 transport_stream_id: 1,
1395 service_id: 100,
1396 };
1397 let svc = store.cache.entry(key).or_default();
1398 for (id, t) in [(1, t0900), (2, t1000), (3, t1100)] {
1399 svc.events.insert(
1400 id,
1401 EpgEvent {
1402 event_id: id,
1403 start_time: Some(t),
1404 duration: Some(sec),
1405 running_status: RunningStatus::Undefined,
1406 free_ca_mode: false,
1407 event_name: Some(format!("Event {id}")),
1408 event_text: None,
1409 extended_text: None,
1410 extended_items: vec![],
1411 content_nibbles: vec![],
1412 ratings: vec![],
1413 crids: vec![],
1414 },
1415 );
1416 }
1417
1418 let events = store.schedule(key, t1000, t1200).unwrap();
1420 assert_eq!(events.len(), 2);
1421 assert_eq!(events[0].event_id, 2);
1422 assert_eq!(events[1].event_id, 3);
1423
1424 let events = store.schedule(key, t1200, t1100).unwrap();
1426 assert!(events.is_empty());
1427 }
1428
1429 #[test]
1434 fn max_services_capped() {
1435 let mut store = EpgStore::new().with_max_services(2);
1438
1439 let desc = short_event_bytes(b"Test", b"");
1440
1441 let sr1 = start_raw(2026, 6, 10, 10);
1443 let eit1 = eit_pf_section(100, 1, 1, 1, 0, sr1, [1, 0, 0], &desc);
1444 store.feed(&eit1).unwrap();
1445 assert_eq!(store.service_count(), 1);
1446
1447 let sr2 = start_raw(2026, 6, 10, 11);
1449 let eit2 = eit_pf_section(200, 1, 1, 3, 0, sr2, [1, 0, 0], &desc);
1450 store.feed(&eit2).unwrap();
1451 assert_eq!(store.service_count(), 2);
1452
1453 let sr3 = start_raw(2026, 6, 10, 12);
1455 let eit3 = eit_pf_section(300, 1, 1, 5, 0, sr3, [1, 0, 0], &desc);
1456 store.feed(&eit3).unwrap();
1457 assert_eq!(
1458 store.service_count(),
1459 2,
1460 "third service must be rejected when cap is full"
1461 );
1462
1463 let key300 = ServiceKey {
1465 original_network_id: 1,
1466 transport_stream_id: 1,
1467 service_id: 300,
1468 };
1469 assert!(
1470 store.events(key300).is_none(),
1471 "rejected service must not appear"
1472 );
1473
1474 store.clear();
1476 store.feed(&eit3).unwrap();
1477 assert_eq!(store.service_count(), 1);
1478 assert!(store.events(key300).is_some());
1479 }
1480
1481 #[test]
1486 fn max_events_per_service_capped() {
1487 let mut store = EpgStore::new().with_max_events_per_service(3);
1490
1491 let desc = short_event_bytes(b"Test", b"");
1492 let key = ServiceKey {
1493 original_network_id: 1,
1494 transport_stream_id: 1,
1495 service_id: 100,
1496 };
1497
1498 for (version, (event_id, hour)) in [(10, 10u32), (20, 11), (30, 12), (40, 13)]
1499 .iter()
1500 .enumerate()
1501 {
1502 let sr = start_raw(2026, 6, 10, *hour);
1503 let eit = eit_pf_section(100, 1, 1, *event_id, version as u8, sr, [1, 0, 0], &desc);
1504 store.feed(&eit).unwrap();
1505 }
1506
1507 assert_eq!(store.event_count(), 3, "4th event must be skipped at cap 3");
1508
1509 let sr_v2 = start_raw(2026, 6, 10, 15);
1511 let eit_v2 = eit_pf_section(100, 1, 1, 10, 1, sr_v2, [1, 0, 0], &desc);
1512 store.feed(&eit_v2).unwrap();
1513 assert_eq!(
1514 store.event_count(),
1515 3,
1516 "version churn on existing event_id must not increase count"
1517 );
1518
1519 let evts = store.events(key).unwrap();
1520 let ev10 = evts.iter().find(|e| e.event_id == 10).unwrap();
1521 assert_eq!(
1522 ev10.event_name.as_deref(),
1523 Some("Test"),
1524 "existing event updated"
1525 );
1526 }
1527
1528 #[cfg(feature = "serde")]
1533 #[test]
1534 fn serde_serializes_store_as_json() {
1535 let t = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
1536 let mut store = EpgStore::new();
1537 let key = ServiceKey {
1538 original_network_id: 1,
1539 transport_stream_id: 1,
1540 service_id: 100,
1541 };
1542 let svc = store.cache.entry(key).or_default();
1543 svc.service_name = Some("BBC One".into());
1544 svc.events.insert(
1545 1,
1546 EpgEvent {
1547 event_id: 1,
1548 start_time: Some(t),
1549 duration: Some(core::time::Duration::from_secs(3600)),
1550 running_status: RunningStatus::Running,
1551 free_ca_mode: false,
1552 event_name: Some("The News".into()),
1553 event_text: Some("Today's headlines".into()),
1554 extended_text: None,
1555 extended_items: vec![],
1556 content_nibbles: vec![ContentNibble {
1557 level_1: 1,
1558 level_2: 1,
1559 user: 0,
1560 }],
1561 ratings: vec![],
1562 crids: vec![],
1563 },
1564 );
1565
1566 let json = serde_json::to_string(&store).unwrap();
1567 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1568 let svc_data = &v["1-1-100"];
1569 assert_eq!(svc_data["service_name"], "BBC One");
1570 assert_eq!(svc_data["events"][0]["event_name"], "The News");
1571 assert_eq!(
1572 svc_data["events"][0]["content_nibbles"][0],
1573 serde_json::json!({"level_1": 1, "level_2": 1, "user": 0})
1574 );
1575 }
1576}