1use crate::collect::CollectResult;
87use std::collections::HashMap;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
91#[cfg_attr(feature = "serde", derive(serde::Serialize))]
92pub struct ServiceKey {
93 pub original_network_id: u16,
95 pub transport_stream_id: u16,
97 pub service_id: u16,
99}
100
101#[non_exhaustive]
104#[derive(Debug, Clone, PartialEq, Eq)]
105#[cfg_attr(feature = "serde", derive(serde::Serialize))]
106pub struct Rating {
107 pub country: String,
109 pub value: u8,
111}
112
113#[non_exhaustive]
116#[derive(Debug, Clone, PartialEq, Eq)]
117#[cfg_attr(feature = "serde", derive(serde::Serialize))]
118pub struct Crid {
119 pub crid_type: u8,
121 pub crid: String,
123}
124
125#[non_exhaustive]
127#[derive(Debug, Clone, PartialEq, Eq)]
128#[cfg_attr(feature = "serde", derive(serde::Serialize))]
129pub struct ExtendedItem {
130 pub description: String,
132 pub item: String,
134}
135
136#[non_exhaustive]
139#[derive(Debug, Clone, PartialEq, Eq)]
140#[cfg_attr(feature = "serde", derive(serde::Serialize))]
141pub struct ContentNibble {
142 pub level_1: u8,
144 pub level_2: u8,
146 pub user: u8,
148}
149
150#[non_exhaustive]
162#[derive(Debug, Clone, PartialEq, Eq)]
163#[cfg_attr(feature = "serde", derive(serde::Serialize))]
164pub struct EpgEvent {
165 pub event_id: u16,
167 pub start_time: Option<chrono::DateTime<chrono::Utc>>,
169 pub duration: Option<core::time::Duration>,
171 pub running_status: u8,
173 pub free_ca_mode: bool,
175 pub event_name: Option<String>,
179 pub event_text: Option<String>,
181 pub extended_text: Option<String>,
186 #[cfg_attr(feature = "serde", serde(default))]
190 pub extended_items: Vec<ExtendedItem>,
191 #[cfg_attr(feature = "serde", serde(default))]
194 pub content_nibbles: Vec<ContentNibble>,
195 #[cfg_attr(feature = "serde", serde(default))]
198 pub ratings: Vec<Rating>,
199 #[cfg_attr(feature = "serde", serde(default))]
202 pub crids: Vec<Crid>,
203}
204
205#[derive(Debug, Clone)]
207#[cfg_attr(feature = "serde", derive(serde::Serialize))]
208struct ServiceData {
209 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
210 service_name: Option<String>,
211 events: Vec<EpgEvent>,
212}
213
214pub const DEFAULT_MAX_SERVICES: usize = 1024;
220
221pub const DEFAULT_MAX_EVENTS_PER_SERVICE: usize = 8192;
227
228#[derive(Debug)]
270pub struct EpgStore {
271 collector: crate::collect::EitCollector,
272 cache: HashMap<ServiceKey, ServiceEpg>,
273 max_services: usize,
274 max_events_per_service: usize,
275}
276
277impl Default for EpgStore {
278 fn default() -> Self {
279 Self {
280 collector: crate::collect::EitCollector::default(),
281 cache: HashMap::new(),
282 max_services: DEFAULT_MAX_SERVICES,
283 max_events_per_service: DEFAULT_MAX_EVENTS_PER_SERVICE,
284 }
285 }
286}
287
288#[derive(Debug, Default)]
289struct ServiceEpg {
290 service_name: Option<String>,
291 events: HashMap<u16, EpgEvent>,
293}
294
295impl EpgStore {
296 #[must_use]
299 pub fn new() -> Self {
300 Self::default()
301 }
302
303 #[must_use]
308 pub fn with_max_services(mut self, max_services: usize) -> Self {
309 self.max_services = max_services;
310 self
311 }
312
313 #[must_use]
318 pub fn with_max_events_per_service(mut self, max_events_per_service: usize) -> Self {
319 self.max_events_per_service = max_events_per_service;
320 self
321 }
322
323 #[must_use]
327 pub fn with_collector_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
328 self.collector = self.collector.with_max_logical_keys(max_logical_keys);
329 self
330 }
331
332 pub fn feed(&mut self, bytes: &[u8]) -> CollectResult<()> {
341 self.feed_with_pid(None, bytes)
342 }
343
344 pub fn feed_with_pid(&mut self, pid: Option<u16>, bytes: &[u8]) -> CollectResult<()> {
346 if let Some(completed) = self.collector.push_section_with_pid(pid, bytes)? {
347 let tables = completed.tables()?;
348 for table in &tables {
349 let key = ServiceKey {
350 original_network_id: table.original_network_id,
351 transport_stream_id: table.transport_stream_id,
352 service_id: table.service_id,
353 };
354 if self.cache.len() >= self.max_services && !self.cache.contains_key(&key) {
355 continue;
356 }
357 let svc = self.cache.entry(key).or_default();
358 for event in &table.events {
359 if svc.events.len() >= self.max_events_per_service
360 && !svc.events.contains_key(&event.event_id)
361 {
362 continue;
363 }
364 svc.events.insert(event.event_id, event_to_epg(event));
365 }
366 }
367 }
368 Ok(())
369 }
370
371 pub fn feed_sdt(&mut self, sdt: &crate::collect::CompleteSdt<'_>) {
376 for svc in &sdt.services {
377 let key = ServiceKey {
378 original_network_id: sdt.original_network_id,
379 transport_stream_id: sdt.transport_stream_id,
380 service_id: svc.service_id,
381 };
382 let entry = self.cache.entry(key).or_default();
383 entry.service_name = extract_service_name(svc.descriptors.descriptors());
384 }
385 }
386
387 pub fn now_and_next(
401 &self,
402 key: ServiceKey,
403 at: chrono::DateTime<chrono::Utc>,
404 ) -> (Option<&EpgEvent>, Option<&EpgEvent>) {
405 let Some(svc) = self.cache.get(&key) else {
406 return (None, None);
407 };
408
409 let now = svc.events.values().find(|e| {
410 if let (Some(start), Some(dur)) = (e.start_time, e.duration) {
411 let end = start + dur;
412 return at >= start && at < end;
413 }
414 false
415 });
416
417 let next = svc
418 .events
419 .values()
420 .filter(|e| {
421 if let Some(start) = e.start_time {
422 start > at
423 } else {
424 false
425 }
426 })
427 .min_by_key(|e| e.start_time);
428
429 (now, next)
430 }
431
432 #[must_use]
437 pub fn schedule(
438 &self,
439 key: ServiceKey,
440 from: chrono::DateTime<chrono::Utc>,
441 to: chrono::DateTime<chrono::Utc>,
442 ) -> Option<Vec<&EpgEvent>> {
443 let svc = self.cache.get(&key)?;
444 let mut events: Vec<&EpgEvent> = svc
445 .events
446 .values()
447 .filter(|e| {
448 if let Some(start) = e.start_time {
449 start >= from && start < to
450 } else {
451 false
452 }
453 })
454 .collect();
455 events.sort_by(|a, b| cmp_event_by_start(a, b));
456 Some(events)
457 }
458
459 #[must_use]
461 pub fn service_name(&self, key: ServiceKey) -> Option<&str> {
462 self.cache.get(&key).and_then(|s| s.service_name.as_deref())
463 }
464
465 pub fn services(&self) -> impl Iterator<Item = ServiceKey> + '_ {
469 self.cache.keys().copied()
470 }
471
472 #[must_use]
475 pub fn events(&self, key: ServiceKey) -> Option<Vec<&EpgEvent>> {
476 let svc = self.cache.get(&key)?;
477 let mut events: Vec<&EpgEvent> = svc.events.values().collect();
478 events.sort_by(|a, b| cmp_event_by_start(a, b));
479 Some(events)
480 }
481
482 #[must_use]
484 pub fn service_count(&self) -> usize {
485 self.cache.len()
486 }
487
488 #[must_use]
490 pub fn event_count(&self) -> usize {
491 self.cache.values().map(|s| s.events.len()).sum()
492 }
493
494 pub fn retain_services<F>(&mut self, mut keep: F)
499 where
500 F: FnMut(&ServiceKey) -> bool,
501 {
502 self.cache.retain(|key, _| keep(key));
503 self.collector.retain_logical(|lk| {
504 keep(&ServiceKey {
505 original_network_id: lk.original_network_id,
506 transport_stream_id: lk.transport_stream_id,
507 service_id: lk.service_id,
508 })
509 });
510 }
511
512 pub fn clear(&mut self) {
514 self.collector.clear();
515 self.cache.clear();
516 }
517}
518
519#[cfg(feature = "serde")]
520impl serde::Serialize for EpgStore {
521 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
522 use serde::ser::SerializeMap;
523 let mut m = s.serialize_map(Some(self.cache.len()))?;
524 for (key, svc) in &self.cache {
525 let data = ServiceData {
526 service_name: svc.service_name.clone(),
527 events: {
528 let mut evts: Vec<EpgEvent> = svc.events.values().cloned().collect();
529 evts.sort_by(cmp_event_by_start);
530 evts
531 },
532 };
533 let key_str = format!(
534 "{}-{}-{}",
535 key.original_network_id, key.transport_stream_id, key.service_id
536 );
537 m.serialize_entry(&key_str, &data)?;
538 }
539 m.end()
540 }
541}
542
543fn cmp_event_by_start(a: &EpgEvent, b: &EpgEvent) -> std::cmp::Ordering {
544 match (a.start_time, b.start_time) {
545 (Some(at), Some(bt)) => at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id)),
546 (Some(_), None) => std::cmp::Ordering::Less,
547 (None, Some(_)) => std::cmp::Ordering::Greater,
548 (None, None) => a.event_id.cmp(&b.event_id),
549 }
550}
551
552fn event_to_epg(e: &crate::collect::CompleteEitEvent<'_>) -> EpgEvent {
553 let (event_name, event_text) = extract_short_event(e.descriptors.descriptors());
554 let (extended_text, extended_items) = extract_extended(e.descriptors.descriptors());
555 let content_nibbles = extract_content(e.descriptors.descriptors());
556 let ratings = extract_ratings(e.descriptors.descriptors());
557 let crids = extract_crids(e.descriptors.descriptors());
558
559 EpgEvent {
560 event_id: e.event_id,
561 start_time: e.start_time(),
562 duration: e.duration(),
563 running_status: e.running_status,
564 free_ca_mode: e.free_ca_mode,
565 event_name,
566 event_text,
567 extended_text,
568 extended_items,
569 content_nibbles,
570 ratings,
571 crids,
572 }
573}
574
575fn extract_short_event(
576 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
577) -> (Option<String>, Option<String>) {
578 for desc in descriptors {
579 if let Ok(crate::descriptors::AnyDescriptor::ShortEvent(se)) = desc {
580 return (
581 Some(se.event_name.decode().into_owned()),
582 Some(se.text.decode().into_owned()),
583 );
584 }
585 }
586 (None, None)
587}
588
589struct ExtendedFragment {
590 descriptor_number: u8,
591 text: String,
592 items: Vec<ExtendedItem>,
593}
594
595fn extract_extended(
596 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
597) -> (Option<String>, Vec<ExtendedItem>) {
598 use crate::descriptors::AnyDescriptor;
599
600 let mut fragments: Vec<ExtendedFragment> = descriptors
601 .iter()
602 .filter_map(|d| {
603 if let Ok(AnyDescriptor::ExtendedEvent(ee)) = d {
604 let text = ee.text.decode().into_owned();
605 let items: Vec<ExtendedItem> = ee
606 .items
607 .iter()
608 .map(|i| ExtendedItem {
609 description: i.description.decode().into_owned(),
610 item: i.value.decode().into_owned(),
611 })
612 .collect();
613 if !text.is_empty() || !items.is_empty() {
614 Some(ExtendedFragment {
615 descriptor_number: ee.descriptor_number,
616 text,
617 items,
618 })
619 } else {
620 None
621 }
622 } else {
623 None
624 }
625 })
626 .collect();
627
628 if fragments.is_empty() {
629 return (None, Vec::new());
630 }
631
632 fragments.sort_by_key(|f| f.descriptor_number);
634
635 let extended_text: String = fragments.iter().map(|f| f.text.as_str()).collect();
636
637 let extended_items: Vec<ExtendedItem> = fragments.into_iter().flat_map(|f| f.items).collect();
638
639 let text = if extended_text.is_empty() {
640 None
641 } else {
642 Some(extended_text)
643 };
644
645 (text, extended_items)
646}
647
648fn extract_content(
649 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
650) -> Vec<ContentNibble> {
651 for desc in descriptors {
652 if let Ok(crate::descriptors::AnyDescriptor::Content(ct)) = desc {
653 return ct
654 .entries
655 .iter()
656 .map(|e| ContentNibble {
657 level_1: e.nibble_1,
658 level_2: e.nibble_2,
659 user: e.user_byte,
660 })
661 .collect();
662 }
663 }
664 Vec::new()
665}
666
667fn extract_ratings(
668 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
669) -> Vec<Rating> {
670 for desc in descriptors {
671 if let Ok(crate::descriptors::AnyDescriptor::ParentalRating(pr)) = desc {
672 return pr
673 .entries
674 .iter()
675 .map(|e| Rating {
676 country: e.country_code.as_str().into_owned(),
677 value: e.rating,
678 })
679 .collect();
680 }
681 }
682 Vec::new()
683}
684
685fn extract_crids(
686 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
687) -> Vec<Crid> {
688 use crate::descriptors::content_identifier::CridLocation;
689 for desc in descriptors {
690 if let Ok(crate::descriptors::AnyDescriptor::ContentIdentifier(ci)) = desc {
691 return ci
692 .entries
693 .iter()
694 .filter_map(|e| match e.location {
695 CridLocation::Inline(bytes) => {
696 let s = String::from_utf8_lossy(bytes).into_owned();
697 Some(Crid {
698 crid_type: e.crid_type,
699 crid: s,
700 })
701 }
702 _ => None,
703 })
704 .collect();
705 }
706 }
707 Vec::new()
708}
709
710fn extract_service_name(
711 descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
712) -> Option<String> {
713 for desc in descriptors {
714 if let Ok(crate::descriptors::AnyDescriptor::Service(svc)) = desc {
715 return Some(svc.service_name.decode().into_owned());
716 }
717 }
718 None
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724 use chrono::{TimeZone, Utc};
725
726 fn short_event_bytes(name: &[u8], text: &[u8]) -> Vec<u8> {
732 let lang = b"eng";
733 let mut v = Vec::new();
734 v.push(0x4Du8); v.push((3 + 1 + name.len() + 1 + text.len()) as u8); v.extend_from_slice(lang);
737 v.push(name.len() as u8);
738 v.extend_from_slice(name);
739 v.push(text.len() as u8);
740 v.extend_from_slice(text);
741 v
742 }
743
744 #[allow(clippy::too_many_arguments)]
748 fn eit_pf_section(
749 service_id: u16,
750 ts_id: u16,
751 on_id: u16,
752 event_id: u16,
753 version: u8,
754 start_raw: [u8; 5],
755 dur_raw: [u8; 3],
756 descriptors: &[u8],
757 ) -> Vec<u8> {
758 let table_id = 0x4Eu8;
759
760 let ev_len = 12 + descriptors.len();
764 let section_length = 5 + 6 + ev_len + 4;
765 let total = 3 + section_length;
766
767 let mut buf = vec![0u8; total];
768 buf[0] = table_id;
769 buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
770 buf[2] = (section_length & 0xFF) as u8;
771 buf[3..5].copy_from_slice(&service_id.to_be_bytes());
772 buf[5] = 0xC0 | ((version & 0x1F) << 1) | 0x01;
774 buf[6] = 0; buf[7] = 0; buf[8..10].copy_from_slice(&ts_id.to_be_bytes());
777 buf[10..12].copy_from_slice(&on_id.to_be_bytes());
778 buf[12] = 0; buf[13] = 0x5F; let ev_off = 14;
783 buf[ev_off..ev_off + 2].copy_from_slice(&event_id.to_be_bytes());
784 buf[ev_off + 2..ev_off + 7].copy_from_slice(&start_raw);
785 buf[ev_off + 7..ev_off + 10].copy_from_slice(&dur_raw);
786 let dll = descriptors.len() as u16;
787 buf[ev_off + 10] = ((dll >> 8) as u8) & 0x0F;
788 buf[ev_off + 11] = (dll & 0xFF) as u8;
789 buf[ev_off + 12..ev_off + 12 + descriptors.len()].copy_from_slice(descriptors);
790
791 let crc_pos = total - 4;
793 let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_pos]);
794 buf[crc_pos..].copy_from_slice(&crc.to_be_bytes());
795 buf
796 }
797
798 fn start_raw(year: i32, month: u32, day: u32, hour: u32) -> [u8; 5] {
801 let mjd = mjd_approx(year, month, day);
802 let mjd_bytes = mjd.to_be_bytes();
803 let bcd_hour = ((hour / 10 * 16) + (hour % 10)) as u8;
804 [
805 mjd_bytes[0],
806 mjd_bytes[1],
807 bcd_hour,
808 0, 0, ]
811 }
812
813 fn mjd_approx(year: i32, month: u32, day: u32) -> u16 {
815 assert!(
816 (year, month, day) == (2026, 6, 10),
817 "mjd_approx only supports 2026-06-10"
818 );
819 61785
820 }
821
822 fn content_descriptor_bytes(entries: &[(u8, u8, u8)]) -> Vec<u8> {
824 let mut v = vec![0x54u8, (entries.len() * 2) as u8];
825 for &(n1, n2, u) in entries {
826 v.push((n1 << 4) | n2);
827 v.push(u);
828 }
829 v
830 }
831
832 fn parental_rating_bytes(entries: &[([u8; 3], u8)]) -> Vec<u8> {
834 let mut v = vec![0x55u8, (entries.len() * 4) as u8];
835 for (country, rating) in entries {
836 v.extend_from_slice(country);
837 v.push(*rating);
838 }
839 v
840 }
841
842 fn content_identifier_bytes(entries: &[(u8, &[u8])]) -> Vec<u8> {
845 let body_len: usize = entries.iter().map(|(_, data)| 2 + data.len()).sum();
846 let mut v = vec![0x76u8, body_len as u8];
847 for (crid_type, data) in entries {
848 v.push(crid_type << 2); v.push(data.len() as u8);
850 v.extend_from_slice(data);
851 }
852 v
853 }
854
855 #[test]
860 fn new_store_is_empty() {
861 let store = EpgStore::new();
862 assert_eq!(store.service_count(), 0);
863 assert_eq!(store.event_count(), 0);
864 }
865
866 #[test]
867 fn feed_empty_is_error() {
868 let mut store = EpgStore::new();
869 assert!(store.feed(&[]).is_err());
870 }
871
872 #[test]
873 fn now_and_next_no_data_returns_none() {
874 let store = EpgStore::new();
875 let now = Utc::now();
876 let key = ServiceKey {
877 original_network_id: 1,
878 transport_stream_id: 1,
879 service_id: 100,
880 };
881 assert_eq!(store.now_and_next(key, now), (None, None));
882 }
883
884 #[test]
885 fn service_key_ordering() {
886 let a = ServiceKey {
887 original_network_id: 1,
888 transport_stream_id: 2,
889 service_id: 100,
890 };
891 let b = ServiceKey {
892 original_network_id: 1,
893 transport_stream_id: 2,
894 service_id: 200,
895 };
896 assert!(a < b);
897 }
898
899 fn empty_event(
900 id: u16,
901 start: Option<chrono::DateTime<chrono::Utc>>,
902 dur: Option<core::time::Duration>,
903 ) -> EpgEvent {
904 EpgEvent {
905 event_id: id,
906 start_time: start,
907 duration: dur,
908 running_status: 0,
909 free_ca_mode: false,
910 event_name: None,
911 event_text: None,
912 extended_text: None,
913 extended_items: Vec::new(),
914 content_nibbles: Vec::new(),
915 ratings: Vec::new(),
916 crids: Vec::new(),
917 }
918 }
919
920 #[test]
921 fn events_sorts_valid_before_invalid() {
922 let valid = empty_event(
923 1,
924 Some(Utc::now()),
925 Some(core::time::Duration::from_secs(3600)),
926 );
927 let invalid = empty_event(2, None, None);
928
929 let mut events = [&invalid, &valid];
930 events.sort_by(|a, b| cmp_event_by_start(a, b));
931 assert_eq!(events[0].event_id, 1);
932 assert_eq!(events[1].event_id, 2);
933 }
934
935 #[test]
940 fn extended_text_chaining_per_spec_6_2_15() {
941 use crate::descriptors::extended_event::ExtendedEventDescriptor;
942 use crate::descriptors::AnyDescriptor;
943 use crate::text::{DvbText, LangCode};
944
945 let frag1 = ExtendedEventDescriptor {
948 descriptor_number: 2,
949 last_descriptor_number: 3,
950 language_code: LangCode(*b"eng"),
951 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
952 description: DvbText::new(b"Director"),
953 value: DvbText::new(b"Alice"),
954 }],
955 text: DvbText::new(b"The quick "),
956 };
957
958 let frag2 = ExtendedEventDescriptor {
961 descriptor_number: 0,
962 last_descriptor_number: 3,
963 language_code: LangCode(*b"eng"),
964 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
965 description: DvbText::new(b"Year"),
966 value: DvbText::new(b"2026"),
967 }],
968 text: DvbText::new(b"brown fox"),
969 };
970
971 let frag3 = ExtendedEventDescriptor {
974 descriptor_number: 3,
975 last_descriptor_number: 3,
976 language_code: LangCode(*b"eng"),
977 items: vec![],
978 text: DvbText::new(b"jumps."),
979 };
980
981 let frag4 = ExtendedEventDescriptor {
985 descriptor_number: 1,
986 last_descriptor_number: 3,
987 language_code: LangCode(*b"eng"),
988 items: vec![crate::descriptors::extended_event::ExtendedEventItem {
989 description: DvbText::new(b"Genre"),
990 value: DvbText::new(b"Thriller"),
991 }],
992 text: DvbText::new(b""),
993 };
994
995 let descriptors: Vec<crate::Result<AnyDescriptor<'_>>> = vec![
997 Ok(AnyDescriptor::ExtendedEvent(frag1)), Ok(AnyDescriptor::ExtendedEvent(frag4)), Ok(AnyDescriptor::ExtendedEvent(frag3)), Ok(AnyDescriptor::ExtendedEvent(frag2)), ];
1002
1003 let (text, items) = extract_extended(&descriptors);
1004
1005 assert_eq!(text.as_deref(), Some("brown foxThe quick jumps."));
1007
1008 assert_eq!(items.len(), 3);
1011 assert_eq!(
1012 items[0],
1013 ExtendedItem {
1014 description: "Year".into(),
1015 item: "2026".into()
1016 }
1017 );
1018 assert_eq!(
1019 items[1],
1020 ExtendedItem {
1021 description: "Genre".into(),
1022 item: "Thriller".into()
1023 }
1024 );
1025 assert_eq!(
1026 items[2],
1027 ExtendedItem {
1028 description: "Director".into(),
1029 item: "Alice".into()
1030 }
1031 );
1032 }
1033
1034 #[test]
1039 fn now_and_next_event_boundary() {
1040 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1041 let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1042 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1043
1044 let sec = core::time::Duration::from_secs(3600);
1047 let ev1 = EpgEvent {
1048 event_id: 1,
1049 start_time: Some(t1000),
1050 duration: Some(sec),
1051 running_status: 0,
1052 free_ca_mode: false,
1053 event_name: Some("Event 1".into()),
1054 event_text: None,
1055 extended_text: None,
1056 extended_items: vec![],
1057 content_nibbles: vec![],
1058 ratings: vec![],
1059 crids: vec![],
1060 };
1061 let ev2 = EpgEvent {
1062 event_id: 2,
1063 start_time: Some(t1200),
1064 duration: Some(sec),
1065 running_status: 0,
1066 free_ca_mode: false,
1067 event_name: Some("Event 2".into()),
1068 event_text: None,
1069 extended_text: None,
1070 extended_items: vec![],
1071 content_nibbles: vec![],
1072 ratings: vec![],
1073 crids: vec![],
1074 };
1075
1076 let mut store = EpgStore::new();
1078 let key = ServiceKey {
1079 original_network_id: 1,
1080 transport_stream_id: 1,
1081 service_id: 100,
1082 };
1083 let svc = store.cache.entry(key).or_default();
1084 svc.events.insert(1, ev1);
1085 svc.events.insert(2, ev2);
1086
1087 let at = Utc.with_ymd_and_hms(2026, 6, 10, 10, 30, 0).unwrap();
1089 let (now, next) = store.now_and_next(key, at);
1090 assert_eq!(now.unwrap().event_id, 1);
1091 assert_eq!(next.unwrap().event_id, 2);
1092
1093 let (now, next) = store.now_and_next(key, t1100);
1096 assert!(now.is_none(), "event ending at query time must NOT be now");
1097 assert_eq!(next.unwrap().event_id, 2);
1098
1099 let (now, next) = store.now_and_next(key, t1200);
1102 assert_eq!(now.unwrap().event_id, 2);
1103 assert!(next.is_none());
1104 }
1105
1106 #[test]
1111 fn now_and_next_returns_earliest_future_event() {
1112 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1118 let t1400 = Utc.with_ymd_and_hms(2026, 6, 10, 14, 0, 0).unwrap();
1119 let t1600 = Utc.with_ymd_and_hms(2026, 6, 10, 16, 0, 0).unwrap();
1120 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1121
1122 let sec = core::time::Duration::from_secs(3600);
1123
1124 fn named_event(
1125 id: u16,
1126 start: chrono::DateTime<chrono::Utc>,
1127 dur: core::time::Duration,
1128 name: &str,
1129 ) -> EpgEvent {
1130 EpgEvent {
1131 event_id: id,
1132 start_time: Some(start),
1133 duration: Some(dur),
1134 running_status: 0,
1135 free_ca_mode: false,
1136 event_name: Some(name.into()),
1137 event_text: None,
1138 extended_text: None,
1139 extended_items: vec![],
1140 content_nibbles: vec![],
1141 ratings: vec![],
1142 crids: vec![],
1143 }
1144 }
1145
1146 let mut store = EpgStore::new();
1147 let key = ServiceKey {
1148 original_network_id: 1,
1149 transport_stream_id: 1,
1150 service_id: 100,
1151 };
1152 let svc = store.cache.entry(key).or_default();
1153 svc.events.insert(3, named_event(3, t1400, sec, "Event 14"));
1155 svc.events.insert(1, named_event(1, t1200, sec, "Event 12"));
1156 svc.events.insert(2, named_event(2, t1600, sec, "Event 16"));
1157
1158 let (_now, next) = store.now_and_next(key, t1000);
1160 let next = next.expect("next event must exist");
1161 assert_eq!(
1162 next.event_id, 1,
1163 "next must be earliest future event (12:00), not first in iteration order"
1164 );
1165 }
1166
1167 #[test]
1172 fn extract_content_ratings_crids_through_feed() {
1173 let content = content_descriptor_bytes(&[(3, 1, 0xAA), (4, 2, 0xBB)]);
1174 let ratings = parental_rating_bytes(&[(*b"FRA", 0x05), (*b"GBR", 0x01)]);
1175 let crids = content_identifier_bytes(&[
1176 (0x01, b"crid://bbc.co.uk/prog123"),
1177 (0x03, b"crid://bbc.co.uk/rec456"),
1178 ]);
1179
1180 let mut descriptors = Vec::new();
1181 descriptors.extend_from_slice(&content);
1182 descriptors.extend_from_slice(&ratings);
1183 descriptors.extend_from_slice(&crids);
1184
1185 let sr = start_raw(2026, 6, 10, 10);
1186 let eit = eit_pf_section(100, 1, 1, 1, 0, sr, [1, 0, 0], &descriptors);
1187
1188 let mut store = EpgStore::new();
1189 store.feed(&eit).unwrap();
1190
1191 let key = ServiceKey {
1192 original_network_id: 1,
1193 transport_stream_id: 1,
1194 service_id: 100,
1195 };
1196 let events = store.events(key).unwrap();
1197 assert_eq!(events.len(), 1);
1198 let ev = &events[0];
1199
1200 assert_eq!(ev.content_nibbles.len(), 2);
1201 assert_eq!(
1202 ev.content_nibbles[0],
1203 ContentNibble {
1204 level_1: 3,
1205 level_2: 1,
1206 user: 0xAA
1207 }
1208 );
1209 assert_eq!(
1210 ev.content_nibbles[1],
1211 ContentNibble {
1212 level_1: 4,
1213 level_2: 2,
1214 user: 0xBB
1215 }
1216 );
1217
1218 assert_eq!(ev.ratings.len(), 2);
1219 assert_eq!(ev.ratings[0].country, "FRA");
1220 assert_eq!(ev.ratings[0].value, 0x05);
1221 assert_eq!(ev.ratings[1].country, "GBR");
1222 assert_eq!(ev.ratings[1].value, 0x01);
1223
1224 assert_eq!(ev.crids.len(), 1 + 1); assert_eq!(ev.crids[0].crid_type, 0x01);
1226 assert_eq!(ev.crids[0].crid, "crid://bbc.co.uk/prog123");
1227 assert_eq!(ev.crids[1].crid_type, 0x03);
1228 assert_eq!(ev.crids[1].crid, "crid://bbc.co.uk/rec456");
1229 }
1230
1231 #[test]
1232 fn extract_service_name_through_feed_sdt() {
1233 use crate::collect::SectionSetCollector;
1234
1235 let svc_desc = {
1237 let provider = b"BBC";
1238 let name = b"BBC ONE HD";
1239 let mut v = vec![0x48u8, (1 + 1 + provider.len() + 1 + name.len()) as u8];
1240 v.push(0x01); v.push(provider.len() as u8);
1242 v.extend_from_slice(provider);
1243 v.push(name.len() as u8);
1244 v.extend_from_slice(name);
1245 v
1246 };
1247
1248 let sdt_bytes = {
1250 let dll = svc_desc.len() as u16;
1251 let svc_entry_len = 5 + dll as usize;
1253 let section_length: u16 = 5 + 3 + svc_entry_len as u16 + 4;
1255 let mut buf = vec![0u8; 3 + section_length as usize];
1256 buf[0] = 0x42; buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
1258 buf[2] = (section_length & 0xFF) as u8;
1259 buf[3..5].copy_from_slice(&1u16.to_be_bytes()); buf[5] = 0xC1; buf[6] = 0; buf[7] = 0; buf[8..10].copy_from_slice(&1u16.to_be_bytes()); buf[10] = 0xFF; let off = 11;
1268 buf[off..off + 2].copy_from_slice(&100u16.to_be_bytes()); buf[off + 2] = 0xFC; buf[off + 3] = ((dll >> 8) as u8) & 0x0F;
1271 buf[off + 4] = (dll & 0xFF) as u8;
1272 buf[off + 5..off + 5 + svc_desc.len()].copy_from_slice(&svc_desc);
1273
1274 let crc_off = buf.len() - 4;
1276 let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_off]);
1277 buf[crc_off..].copy_from_slice(&crc.to_be_bytes());
1278 buf
1279 };
1280
1281 let mut collector = SectionSetCollector::new();
1282 let complete = collector.push_section(&sdt_bytes).unwrap().unwrap();
1283 let sdt = complete.sdt().unwrap();
1284
1285 let mut store = EpgStore::new();
1286 store.feed_sdt(&sdt);
1287
1288 let key = ServiceKey {
1289 original_network_id: 1,
1290 transport_stream_id: 1,
1291 service_id: 100,
1292 };
1293 assert_eq!(store.service_name(key), Some("BBC ONE HD"));
1294 assert_eq!(store.service_count(), 1);
1295 }
1296
1297 #[test]
1302 fn version_churn_bounded_growth() {
1303 let s = |hh: u32| {
1306 let t = Utc.with_ymd_and_hms(2026, 6, 10, hh, 0, 0).unwrap();
1307 let days = 61785u16; let mjd_bytes = days.to_be_bytes();
1309 let bcd_time = [(hh / 10 * 16 + hh % 10) as u8, 0, 0];
1310 (
1311 [
1312 mjd_bytes[0],
1313 mjd_bytes[1],
1314 bcd_time[0],
1315 bcd_time[1],
1316 bcd_time[2],
1317 ],
1318 t,
1319 )
1320 };
1321
1322 let (start1, _) = s(10);
1323 let (start2, _) = s(14);
1324
1325 let desc1 = short_event_bytes(b"News at 10", b"");
1326 let desc2 = short_event_bytes(b"News at 14", b"");
1327
1328 let eit1 = eit_pf_section(100, 1, 1, 1, 0, start1, [1, 0, 0], &desc1);
1329 let eit2 = eit_pf_section(100, 1, 1, 1, 1, start2, [1, 0, 0], &desc2);
1330
1331 let mut store = EpgStore::new();
1332 store.feed(&eit1).unwrap();
1333 assert_eq!(store.event_count(), 1);
1334 store.feed(&eit2).unwrap();
1335 assert_eq!(store.event_count(), 1);
1337
1338 let key = ServiceKey {
1339 original_network_id: 1,
1340 transport_stream_id: 1,
1341 service_id: 100,
1342 };
1343 let evts = store.events(key).unwrap();
1344 assert_eq!(evts.len(), 1);
1345 assert_eq!(evts[0].event_name.as_deref(), Some("News at 14"));
1346 }
1347
1348 #[test]
1353 fn schedule_range_query() {
1354 let t0900 = Utc.with_ymd_and_hms(2026, 6, 10, 9, 0, 0).unwrap();
1355 let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1356 let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1357 let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1358
1359 let sec = core::time::Duration::from_secs(1800);
1360 let mut store = EpgStore::new();
1361 let key = ServiceKey {
1362 original_network_id: 1,
1363 transport_stream_id: 1,
1364 service_id: 100,
1365 };
1366 let svc = store.cache.entry(key).or_default();
1367 for (id, t) in [(1, t0900), (2, t1000), (3, t1100)] {
1368 svc.events.insert(
1369 id,
1370 EpgEvent {
1371 event_id: id,
1372 start_time: Some(t),
1373 duration: Some(sec),
1374 running_status: 0,
1375 free_ca_mode: false,
1376 event_name: Some(format!("Event {id}")),
1377 event_text: None,
1378 extended_text: None,
1379 extended_items: vec![],
1380 content_nibbles: vec![],
1381 ratings: vec![],
1382 crids: vec![],
1383 },
1384 );
1385 }
1386
1387 let events = store.schedule(key, t1000, t1200).unwrap();
1389 assert_eq!(events.len(), 2);
1390 assert_eq!(events[0].event_id, 2);
1391 assert_eq!(events[1].event_id, 3);
1392
1393 let events = store.schedule(key, t1200, t1100).unwrap();
1395 assert!(events.is_empty());
1396 }
1397
1398 #[test]
1403 fn max_services_capped() {
1404 let mut store = EpgStore::new().with_max_services(2);
1407
1408 let desc = short_event_bytes(b"Test", b"");
1409
1410 let sr1 = start_raw(2026, 6, 10, 10);
1412 let eit1 = eit_pf_section(100, 1, 1, 1, 0, sr1, [1, 0, 0], &desc);
1413 store.feed(&eit1).unwrap();
1414 assert_eq!(store.service_count(), 1);
1415
1416 let sr2 = start_raw(2026, 6, 10, 11);
1418 let eit2 = eit_pf_section(200, 1, 1, 3, 0, sr2, [1, 0, 0], &desc);
1419 store.feed(&eit2).unwrap();
1420 assert_eq!(store.service_count(), 2);
1421
1422 let sr3 = start_raw(2026, 6, 10, 12);
1424 let eit3 = eit_pf_section(300, 1, 1, 5, 0, sr3, [1, 0, 0], &desc);
1425 store.feed(&eit3).unwrap();
1426 assert_eq!(
1427 store.service_count(),
1428 2,
1429 "third service must be rejected when cap is full"
1430 );
1431
1432 let key300 = ServiceKey {
1434 original_network_id: 1,
1435 transport_stream_id: 1,
1436 service_id: 300,
1437 };
1438 assert!(
1439 store.events(key300).is_none(),
1440 "rejected service must not appear"
1441 );
1442
1443 store.clear();
1445 store.feed(&eit3).unwrap();
1446 assert_eq!(store.service_count(), 1);
1447 assert!(store.events(key300).is_some());
1448 }
1449
1450 #[test]
1455 fn max_events_per_service_capped() {
1456 let mut store = EpgStore::new().with_max_events_per_service(3);
1459
1460 let desc = short_event_bytes(b"Test", b"");
1461 let key = ServiceKey {
1462 original_network_id: 1,
1463 transport_stream_id: 1,
1464 service_id: 100,
1465 };
1466
1467 for (version, (event_id, hour)) in [(10, 10u32), (20, 11), (30, 12), (40, 13)]
1468 .iter()
1469 .enumerate()
1470 {
1471 let sr = start_raw(2026, 6, 10, *hour);
1472 let eit = eit_pf_section(100, 1, 1, *event_id, version as u8, sr, [1, 0, 0], &desc);
1473 store.feed(&eit).unwrap();
1474 }
1475
1476 assert_eq!(store.event_count(), 3, "4th event must be skipped at cap 3");
1477
1478 let sr_v2 = start_raw(2026, 6, 10, 15);
1480 let eit_v2 = eit_pf_section(100, 1, 1, 10, 1, sr_v2, [1, 0, 0], &desc);
1481 store.feed(&eit_v2).unwrap();
1482 assert_eq!(
1483 store.event_count(),
1484 3,
1485 "version churn on existing event_id must not increase count"
1486 );
1487
1488 let evts = store.events(key).unwrap();
1489 let ev10 = evts.iter().find(|e| e.event_id == 10).unwrap();
1490 assert_eq!(
1491 ev10.event_name.as_deref(),
1492 Some("Test"),
1493 "existing event updated"
1494 );
1495 }
1496
1497 #[cfg(feature = "serde")]
1502 #[test]
1503 fn serde_serializes_store_as_json() {
1504 let t = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
1505 let mut store = EpgStore::new();
1506 let key = ServiceKey {
1507 original_network_id: 1,
1508 transport_stream_id: 1,
1509 service_id: 100,
1510 };
1511 let svc = store.cache.entry(key).or_default();
1512 svc.service_name = Some("BBC One".into());
1513 svc.events.insert(
1514 1,
1515 EpgEvent {
1516 event_id: 1,
1517 start_time: Some(t),
1518 duration: Some(core::time::Duration::from_secs(3600)),
1519 running_status: 4,
1520 free_ca_mode: false,
1521 event_name: Some("The News".into()),
1522 event_text: Some("Today's headlines".into()),
1523 extended_text: None,
1524 extended_items: vec![],
1525 content_nibbles: vec![ContentNibble {
1526 level_1: 1,
1527 level_2: 1,
1528 user: 0,
1529 }],
1530 ratings: vec![],
1531 crids: vec![],
1532 },
1533 );
1534
1535 let json = serde_json::to_string(&store).unwrap();
1536 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1537 let svc_data = &v["1-1-100"];
1538 assert_eq!(svc_data["service_name"], "BBC One");
1539 assert_eq!(svc_data["events"][0]["event_name"], "The News");
1540 assert_eq!(
1541 svc_data["events"][0]["content_nibbles"][0],
1542 serde_json::json!({"level_1": 1, "level_2": 1, "user": 0})
1543 );
1544 }
1545}