use std::io::BufRead;
use std::io::Read;
use std::io::Write;
use std::str::FromStr;
use clap::{Parser, Subcommand};
const POLL_INTERVAL: u64 = 5;
#[derive(Parser, Debug)]
#[clap(version)]
struct Args {
#[clap(value_parser)]
path: String,
#[clap(subcommand)]
command: Commands,
}
#[derive(Subcommand, Debug)]
enum Commands {
Put {
#[clap(long, value_parser)]
topic: Option<String>,
#[clap(long, value_parser)]
attribute: Option<String>,
#[clap(short, long, action)]
follow: bool,
},
Get {
#[clap(value_parser)]
id: String,
},
Cat {
#[clap(short, long, action)]
follow: bool,
#[clap(long, action)]
sse: bool,
#[clap(short, long, value_parser)]
last_id: Option<scru128::Scru128Id>,
},
Call {
#[clap(long, value_parser)]
topic: String,
},
Serve {
#[clap(long, value_parser)]
topic: String,
#[clap(value_parser)]
command: String,
#[clap(value_parser)]
args: Vec<String>,
},
}
fn main() {
let params = Args::parse();
let env = xs_lib::store_open(std::path::Path::new(¶ms.path)).unwrap_or_else(|err| {
eprintln!("Error opening store: {}", err);
std::process::exit(1);
});
match ¶ms.command {
Commands::Put {
topic,
attribute,
follow,
} => {
if *follow {
let stdin = std::io::stdin();
for line in stdin.lock().lines() {
let data = line.unwrap();
match xs_lib::store_put(&env, topic.clone(), attribute.clone(), data) {
Ok(id) => println!("{}", id),
Err(err) => eprintln!("Error putting data: {}", err),
}
}
} else {
let mut data = String::new();
std::io::stdin().read_to_string(&mut data).unwrap();
match xs_lib::store_put(&env, topic.clone(), attribute.clone(), data) {
Ok(id) => println!("{}", id),
Err(err) => eprintln!("Error putting data: {}", err),
}
}
}
Commands::Get { id } => {
let id = scru128::Scru128Id::from_str(id).unwrap();
match xs_lib::store_get(&env, id) {
Ok(Some(frame)) => {
let frame = serde_json::to_string(&frame).unwrap();
println!("{}", frame);
}
Ok(None) => eprintln!("No frame found for id: {}", id),
Err(err) => eprintln!("Error getting frame: {}", err),
}
}
Commands::Cat {
follow,
sse,
last_id,
} => {
if *sse {
println!(": welcome");
}
let mut last_id = *last_id;
let mut signals =
signal_hook::iterator::Signals::new(signal_hook::consts::TERM_SIGNALS).unwrap();
loop {
match xs_lib::store_cat(&env, last_id) {
Ok(frames) => {
for frame in frames {
last_id = Some(frame.id);
let data = serde_json::to_string(&frame).unwrap();
match sse {
true => {
println!("id: {}", frame.id);
println!("data: {}\n", data);
}
false => println!("{}", data),
}
}
}
Err(err) => eprintln!("Error getting frames: {}", err),
}
if !follow {
return;
}
std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL));
if signals.pending().next().is_some() {
return;
}
}
}
Commands::Call { topic } => {
let mut data = String::new();
std::io::stdin().read_to_string(&mut data).unwrap();
let id =
match xs_lib::store_put(&env, Some(topic.clone()), Some(".request".into()), data) {
Ok(id) => id,
Err(err) => {
eprintln!("Error putting request: {}", err);
return;
}
};
let mut last_id = Some(id);
let mut signals =
signal_hook::iterator::Signals::new(signal_hook::consts::TERM_SIGNALS).unwrap();
loop {
match xs_lib::store_cat(&env, last_id) {
Ok(frames) => {
for frame in frames {
last_id = Some(frame.id);
if frame.topic != Some(topic.to_string())
|| frame.attribute != Some(".response".into())
{
continue;
}
let response: xs_lib::ResponseFrame =
serde_json::from_str(&frame.data).unwrap();
if response.source_id == id {
print!("{}", response.data);
}
return;
}
}
Err(err) => eprintln!("Error getting frames: {}", err),
}
std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL));
if signals.pending().next().is_some() {
return;
}
}
}
Commands::Serve {
topic,
command,
args,
} => {
let mut last_id = xs_lib::store_cat(&env, None)
.unwrap_or_else(|err| {
eprintln!("Error getting frames: {}", err);
std::process::exit(1);
})
.iter()
.filter(|frame| {
frame.topic == Some(topic.to_string())
&& frame.attribute == Some(".response".into())
})
.last()
.map(|frame| {
serde_json::from_str::<xs_lib::ResponseFrame>(&frame.data)
.unwrap()
.source_id
});
let mut signals =
signal_hook::iterator::Signals::new(signal_hook::consts::TERM_SIGNALS).unwrap();
loop {
match xs_lib::store_cat(&env, last_id) {
Ok(frames) => {
for frame in frames {
last_id = Some(frame.id);
if frame.topic != Some(topic.to_string())
|| frame.attribute != Some(".request".into())
{
continue;
}
let mut p = std::process::Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("failed to spawn");
{
let mut stdin = p.stdin.take().expect("failed to take stdin");
stdin.write_all(frame.data.as_bytes()).unwrap();
}
let res = p.wait_with_output().unwrap();
let data = String::from_utf8(res.stdout).unwrap();
let res = xs_lib::ResponseFrame {
source_id: frame.id,
data,
};
let data = serde_json::to_string(&res).unwrap();
let _ = xs_lib::store_put(
&env,
Some(topic.clone()),
Some(".response".into()),
data,
)
.unwrap_or_else(|err| {
eprintln!("Error putting response: {}", err);
std::process::exit(1);
});
}
}
Err(err) => eprintln!("Error getting frames: {}", err),
}
std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL));
if signals.pending().next().is_some() {
return;
}
}
}
}
}