use std::process::Child;
use byteorder::{NativeEndian, WriteBytesExt};
use chrono::{DateTime, Utc};
use log::{debug, error, warn};
use sameold::{Message, MessageHeader, SameReceiver};
use crate::cli::Args;
use crate::spawner;
pub fn run<I>(args: &Args, receiver: &mut SameReceiver, mut input: I)
where
I: Iterator<Item = i16>,
{
if args.demo {
let duration_message = (receiver.input_rate() * 8) as usize;
let dmo = make_demo_message(&Utc::now());
warn!("demonstration (--demo) mode: the following messages are NOT LIVE!");
let mut alerting = State::<Alerting>::from(dmo);
alerting.until_message_end(&args, receiver, &mut input.take(duration_message));
for _i in 0..3 {
alerting = State::<Alerting>::from(Message::EndOfMessage);
alerting.until_message_end(&args, receiver, &mut std::iter::once(0i16));
}
} else {
let mut waiting = State::<Waiting>::new();
while let Some(alerting) = waiting.until_message_start(receiver, &mut input) {
waiting = alerting.until_message_end(&args, receiver, &mut input);
}
}
}
#[derive(Debug)]
struct State<S> {
state: S,
}
#[derive(Debug)]
struct Waiting {}
#[derive(Debug)]
struct Alerting {
next: Option<Message>,
}
impl<S> State<S> {
pub fn new() -> State<Waiting> {
State { state: Waiting {} }
}
}
impl State<Waiting> {
pub fn until_message_start<I>(
self,
receiver: &mut SameReceiver,
input: &mut I,
) -> Option<State<Alerting>>
where
I: Iterator<Item = i16>,
{
for msg in receiver.iter_messages(input.map(|sa| sa as f32)) {
return Some(msg.into());
}
None
}
}
impl State<Alerting> {
pub fn until_message_end<I>(
mut self,
config: &Args,
receiver: &mut SameReceiver,
input: &mut I,
) -> State<Waiting>
where
I: Iterator<Item = i16>,
{
while let Some(msg) = self.state.next.take() {
if !config.quiet {
println!("{}", msg);
}
let hdr = match msg {
Message::StartOfMessage(hdr) => hdr,
Message::EndOfMessage => break, };
if config.child.is_empty() {
debug!("no child process to spawn");
return self.into(); }
let input_rate_string = format!("{}", config.rate);
let mut child = match spawner::spawn(
&config.child[0],
&config.child[1..],
&hdr,
&input_rate_string,
) {
Ok(child) => child,
Err(err) => {
error!("unable to spawn child process: {}", err);
return self.into();
}
};
debug!("spawned child process PID {}", child.id());
self.state.next = run_child(&mut child, receiver, input);
match child.wait() {
Ok(exit) => {
if exit.success() {
debug!("child process exited successfully");
} else {
warn!(
"child process exited abnormally with status {}",
exit.code().unwrap_or(1)
);
}
}
Err(err) => {
error!("unable to await child process exit: {}", err);
}
}
}
self.into()
}
}
fn run_child<I>(child: &mut Child, receiver: &mut SameReceiver, input: &mut I) -> Option<Message>
where
I: Iterator<Item = i16>,
{
let mut child_pipe = if let Some(pipe) = child.stdin.take() {
pipe
} else {
error!("unable to create pipe to child process");
return None;
};
for msg in receiver.iter_messages(
input
.inspect(|sa| {
let _ = child_pipe.write_i16::<NativeEndian>(*sa);
})
.map(|sa| sa as f32),
) {
if let Message::StartOfMessage(ref _hdr) = msg {
warn!("received SAME start-of-message without end-of-message");
}
return Some(msg);
}
None
}
impl From<Message> for State<Alerting> {
fn from(message: Message) -> Self {
debug!("new state: alerting");
Self {
state: Alerting {
next: Some(message),
},
}
}
}
impl From<State<Alerting>> for State<Waiting> {
fn from(_state: State<Alerting>) -> Self {
debug!("new state: waiting");
Self { state: Waiting {} }
}
}
fn make_demo_message(at: &DateTime<Utc>) -> Message {
let msg_string = format!("ZCZC-EAS-DMO-999000+0015-{}-N0 CALL -", at.format("%j%H%M"));
Message::StartOfMessage(MessageHeader::new(msg_string).expect("unable to create DMO message"))
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, TimeZone, Utc};
use sameold::EventCode;
#[test]
fn test_make_demo_message() {
let tm = Utc.with_ymd_and_hms(2020, 12, 31, 23, 22, 00).unwrap();
let msg = make_demo_message(&tm);
let msg = match msg {
Message::StartOfMessage(hdr) => hdr,
_ => unreachable!(),
};
assert_eq!(msg.event().unwrap(), EventCode::PracticeDemoWarning);
assert_eq!(msg.issue_datetime(&tm).unwrap(), tm);
assert_eq!(msg.valid_duration(), Duration::minutes(15));
}
}