extern crate va_ts as ts;
mod error;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::fmt;
use std::net::{Ipv4Addr, UdpSocket};
use std::process;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use clap::{App, Arg};
use url::{Host, Url};
use error::{Error, Kind as ErrorKind, Result};
trait Input {
fn open(&mut self) -> Result<()>;
fn read(&mut self) -> Result<()>;
fn close(&mut self) -> Result<()>;
}
struct DemuxerTSEvents {
done_once: HashSet<ts::SubtableID>,
}
impl Default for DemuxerTSEvents {
fn default() -> Self {
DemuxerTSEvents {
done_once: Default::default(),
}
}
}
struct EITFmt<'t>(&'t ts::DemuxedTable);
impl<'t> fmt::Display for EITFmt<'t> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for section_ref in self.0.sections.0.iter() {
let section = (*section_ref).borrow();
let raw = section.buf.0.get_ref().as_slice();
let eit = ts::EIT::new(raw);
for event in eit.events().filter_map(ts::Result::ok) {
write!(
f,
" {} ~ {}\n",
event.start_time(),
ts::DurationFmt::from(event.duration()),
)?;
if let Some(descs) = event.descriptors() {
for desc in descs
.filter_map(ts::Result::ok)
.filter(|d| d.is_dvb_short_event())
{
match desc.tag() {
ts::Tag::DVB(ts::TagDVB::ShortEvent) => {
let desc = ts::DescDVB0x4D::new(desc.buf_data());
let mut dst_buf = [0u8; 256];
let mut dst_str = std::str::from_utf8_mut(&mut dst_buf).unwrap();
match ts::AnnexA2::decode(desc.event_name(), &mut dst_str) {
Ok(..) => write!(f, r#" "{}""#, dst_str),
Err(err) => write!(f, " (error: {:?})", err),
}?;
dst_buf = [0u8; 256];
dst_str = std::str::from_utf8_mut(&mut dst_buf).unwrap();
match ts::AnnexA2::decode(desc.text(), &mut dst_str) {
Ok(..) => write!(f, r#" "{}""#, dst_str),
Err(err) => write!(f, " (error: {})", err),
}?;
write!(f, "\n")?;
}
_ => {}
}
}
}
}
}
Ok(())
}
}
impl ts::DemuxerEvents for DemuxerTSEvents {
fn on_table(&mut self, id: ts::SubtableID, tbl: &ts::DemuxedTable) {
if self.done_once.contains(&id) {
return;
} else {
self.done_once.insert(id);
}
match id {
ts::SubtableID::EIT(..) => {
print!(":EIT\n{}", EITFmt(tbl));
}
_ => {
for section_ref in tbl.sections.0.iter() {
let section = (*section_ref).borrow();
let raw = section.buf.0.get_ref().as_slice();
match id {
ts::SubtableID::PAT(..) => {
println!("{:?}", ts::PAT::new(raw));
}
ts::SubtableID::SDT(..) => {
println!("{:?}", ts::SDT::new(raw));
}
ts::SubtableID::PMT(..) => {
println!("{:?}", ts::PMT::new(raw));
}
ts::SubtableID::EIT(..) => {
println!("{:?}", ts::EIT::new(raw));
}
};
}
}
}
}
fn on_packet(&mut self, pkt: &ts::DemuxedPacket) {
println!(
"(0x{:016X}) :pid {:?} :pts {:?} :dts {:?} :sz {}",
pkt.offset,
pkt.pid,
pkt.pts.map(ts::DurationFmt::from),
pkt.dts.map(ts::DurationFmt::from),
pkt.buf.sz(),
);
}
}
struct InputUDP {
url: Url,
buf: Arc<(Mutex<VecDeque<[u8; ts::Packet::SZ]>>, Condvar)>,
demuxer: ts::Demuxer<DemuxerTSEvents>,
}
impl InputUDP {
pub fn new(url: Url, buf_cap: usize) -> InputUDP {
InputUDP {
url: url,
buf: Arc::new((Mutex::new(VecDeque::with_capacity(buf_cap)), Condvar::new())),
demuxer: ts::Demuxer::new(Default::default()),
}
}
}
impl Input for InputUDP {
fn open(&mut self) -> Result<()> {
let input_host = self
.url
.host()
.ok_or(Error::new(ErrorKind::InputUrlMissingHost))?;
let input_port = self.url.port().unwrap_or(5500);
let input_host_domain = match input_host {
Host::Domain(v) => Ok(v),
_ => Err(Error::new(ErrorKind::InputUrlHostMustBeDomain)),
}?;
let iface = Ipv4Addr::new(0, 0, 0, 0);
println!(
"[<] {:?}: {:?} @ {:?}",
input_host_domain, input_port, iface
);
let input_host_ip_v4: Ipv4Addr = input_host_domain.parse().unwrap();
let socket = UdpSocket::bind((input_host_domain, input_port))?;
if let Err(e) = socket.join_multicast_v4(&input_host_ip_v4, &iface) {
eprintln!("error join-multiocast-v4: {}", e);
}
let pair = self.buf.clone();
thread::spawn(move || {
let mut ts_pkt_raw: [u8; ts::Packet::SZ] = [0; ts::Packet::SZ];
loop {
let mut pkts_raw = [0; 7 * ts::Packet::SZ];
let (_, _) = socket.recv_from(&mut pkts_raw).unwrap();
let &(ref lock, ref cvar) = &*pair;
let mut buf = match lock.lock() {
Err(e) => {
eprintln!("lock and get buffer failed: {}", e);
continue;
}
Ok(buf) => buf,
};
for pkt_index in 0..7 * ts::Packet::SZ / ts::Packet::SZ {
let ts_pkt_raw_src =
&pkts_raw[pkt_index * ts::Packet::SZ..(pkt_index + 1) * ts::Packet::SZ];
ts_pkt_raw.copy_from_slice(ts_pkt_raw_src);
buf.push_back(ts_pkt_raw);
}
cvar.notify_all();
}
});
Ok(())
}
fn read(&mut self) -> Result<()> {
let pair = self.buf.clone();
let &(ref lock, ref cvar) = &*pair;
let mut buf = lock.lock().ok().ok_or(Error::new_with_details(
ErrorKind::SyncPoison,
"udp read lock error",
))?;
buf = cvar.wait(buf).ok().ok_or(Error::new_with_details(
ErrorKind::SyncPoison,
"udp read cwar wait error",
))?;
while !buf.is_empty() {
let ts_pkt_raw = buf.pop_front().unwrap();
if let Err(e) = self.demuxer.demux(&ts_pkt_raw) {
eprintln!("error demux ts-packet: ({:?})", e);
}
}
Ok(())
}
fn close(&mut self) -> Result<()> {
println!("<<< UDP close");
Ok(())
}
}
struct Wrkr<I> {
input: Arc<Mutex<I>>,
}
impl<I> Wrkr<I>
where
I: Input + std::marker::Send + 'static,
{
pub fn new(input: I) -> Wrkr<I> {
Wrkr {
input: Arc::new(Mutex::new(input)),
}
}
pub fn run(&self) -> Result<()> {
let input = self.input.clone();
{
input.lock().unwrap().open()?;
}
thread::spawn(move || loop {
match input.lock().unwrap().read() {
Err(err) => {
eprintln!("error read {}", err);
return;
}
Ok(_) => {}
}
});
Ok(())
}
}
fn main() {
let matches = App::new("V/A tool")
.version("0.0.3")
.author("Ivan Egorov <vany.egorov@gmail.com>")
.about("simple mpeg-ts mcast probe")
.arg(
Arg::with_name("input")
.short("i")
.long("input")
.help("Sets the input file to use")
.required(true)
.takes_value(true),
)
.get_matches();
let input_raw = matches.value_of("input").unwrap();
let input_url = match Url::parse(input_raw) {
Ok(v) => v,
Err(err) => {
eprintln!("error parse input url: {:?}\n", err);
process::exit(1);
}
};
let input = InputUDP::new(input_url, 5000 * 7);
let wrkr = Wrkr::new(input);
if let Err(err) = wrkr.run() {
eprintln!("error start worker: {:?}\n", err);
process::exit(1);
}
loop {
thread::sleep(Duration::from_secs(1));
}
}