use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
use clap::{Parser, Subcommand};
use dirs::config_dir;
use tokio::io::AsyncWriteExt;
use xs::nu;
use xs::store::{
parse_ttl, validate_topic, validate_topic_query, FollowOption, ReadOptions, Store, StoreError,
};
fn parse_topic(s: &str) -> Result<String, String> {
validate_topic(s).map_err(|e| e.to_string())?;
Ok(s.to_string())
}
fn parse_topic_query(s: &str) -> Result<String, String> {
validate_topic_query(s).map_err(|e| e.to_string())?;
Ok(s.to_string())
}
#[derive(Parser, Debug)]
#[clap(version)]
struct Args {
#[clap(subcommand)]
command: Command,
}
#[derive(Subcommand, Debug)]
enum Command {
Serve(CommandServe),
Cat(CommandCat),
Append(CommandAppend),
Cas(CommandCas),
CasPost(CommandCasPost),
Remove(CommandRemove),
Last(CommandLast),
Get(CommandGet),
Import(CommandImport),
Version(CommandVersion),
Nu(CommandNu),
Scru128(CommandScru128),
Eval(CommandEval),
}
#[derive(Parser, Debug)]
struct CommandServe {
#[clap(value_parser)]
path: PathBuf,
#[clap(long, value_parser, value_name = "LISTEN_ADDR")]
expose: Option<String>,
}
#[derive(Parser, Debug)]
struct CommandCat {
#[clap(value_parser)]
addr: String,
#[clap(long, short = 'f')]
follow: bool,
#[clap(long, short = 'p')]
pulse: Option<u64>,
#[clap(long, short = 'n')]
new: bool,
#[clap(long, short = 'a')]
after: Option<String>,
#[clap(long)]
from: Option<String>,
#[clap(long)]
limit: Option<u64>,
#[clap(long)]
last: Option<u64>,
#[clap(long)]
sse: bool,
#[clap(long = "topic", short = 'T', value_parser = parse_topic_query)]
topic: Option<String>,
#[clap(long)]
with_timestamp: bool,
}
#[derive(Parser, Debug)]
struct CommandAppend {
#[clap(value_parser)]
addr: String,
#[clap(value_parser = parse_topic)]
topic: String,
#[clap(long, value_parser)]
meta: Option<String>,
#[clap(long)]
ttl: Option<String>,
#[clap(long)]
with_timestamp: bool,
}
#[derive(Parser, Debug)]
struct CommandCas {
#[clap(value_parser)]
addr: String,
#[clap(value_parser)]
hash: String,
}
#[derive(Parser, Debug)]
struct CommandCasPost {
#[clap(value_parser)]
addr: String,
}
#[derive(Parser, Debug)]
struct CommandRemove {
#[clap(value_parser)]
addr: String,
#[clap(value_parser)]
id: String,
}
#[derive(Parser, Debug)]
struct CommandLast {
#[clap(value_parser)]
addr: String,
#[clap(value_parser)]
args: Vec<String>,
#[clap(long, short = 'f')]
follow: bool,
#[clap(long)]
with_timestamp: bool,
}
impl CommandLast {
fn parse_args(
&self,
) -> Result<(Option<String>, usize), Box<dyn std::error::Error + Send + Sync>> {
let mut topic: Option<String> = None;
let mut count: usize = 1;
for arg in &self.args {
if let Ok(n) = arg.parse::<usize>() {
count = n;
} else {
validate_topic_query(arg)?;
topic = Some(arg.clone());
}
}
Ok((topic, count))
}
}
#[derive(Parser, Debug)]
struct CommandGet {
#[clap(value_parser)]
addr: String,
#[clap(value_parser)]
id: String,
#[clap(long)]
with_timestamp: bool,
}
#[derive(Parser, Debug)]
struct CommandEval {
#[clap(value_parser)]
addr: String,
#[clap(value_parser)]
file: Option<String>,
#[clap(short = 'c', long = "commands")]
commands: Option<String>,
}
fn extract_addr_from_command(command: &Command) -> Option<String> {
match command {
Command::Cat(cmd) => Some(cmd.addr.clone()),
Command::Append(cmd) => Some(cmd.addr.clone()),
Command::Cas(cmd) => Some(cmd.addr.clone()),
Command::CasPost(cmd) => Some(cmd.addr.clone()),
Command::Remove(cmd) => Some(cmd.addr.clone()),
Command::Last(cmd) => Some(cmd.addr.clone()),
Command::Get(cmd) => Some(cmd.addr.clone()),
Command::Import(cmd) => Some(cmd.addr.clone()),
Command::Version(cmd) => Some(cmd.addr.clone()),
Command::Eval(cmd) => Some(cmd.addr.clone()),
Command::Serve(_) | Command::Nu(_) | Command::Scru128(_) => None,
}
}
fn format_connection_error(addr: &str) -> String {
format!("no store at: {addr}\nto start one:\n xs serve {addr}")
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls crypto provider");
nu_command::tls::CRYPTO_PROVIDER
.default()
.then_some(())
.expect("failed to set nu_command crypto provider");
let args = Args::parse();
let addr = extract_addr_from_command(&args.command);
let res = match args.command {
Command::Serve(args) => serve(args).await,
Command::Cat(args) => cat(args).await,
Command::Append(args) => append(args).await,
Command::Cas(args) => cas(args).await,
Command::CasPost(args) => cas_post(args).await,
Command::Remove(args) => remove(args).await,
Command::Last(args) => last(args).await,
Command::Get(args) => get(args).await,
Command::Import(args) => import(args).await,
Command::Version(args) => version(args).await,
Command::Eval(args) => eval(args).await,
Command::Nu(args) => run_nu(args),
Command::Scru128(args) => run_scru128(args),
};
if let Err(err) = res {
if xs::error::NotFound::is_not_found(&err) {
std::process::exit(1);
}
else if xs::error::has_not_found_io_error(&err) {
if let Some(addr) = addr {
eprintln!("{}", format_connection_error(&addr));
} else {
eprintln!("command error: {err}");
}
std::process::exit(1);
}
else {
eprintln!("command error: {err}");
std::process::exit(1);
}
}
Ok(())
}
async fn serve(args: CommandServe) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
xs::trace::init();
tracing::trace!("Starting server with path: {:?}", args.path);
let store = match Store::new(args.path.clone()) {
Ok(store) => store,
Err(StoreError::Locked) => {
let sock_path = args.path.join("sock");
eprintln!("store locked: {} (already running)", args.path.display());
eprintln!("connect to it:");
eprintln!(
" curl --unix-socket {} http://localhost/",
sock_path.display()
);
eprintln!("or with xs.nu:");
eprintln!(
" with-env {{XS_ADDR: \"{}\"}} {{ .cat }}",
sock_path.display()
);
std::process::exit(1);
}
Err(e) => return Err(e.into()),
};
let engine = nu::Engine::new()?;
{
let store = store.clone();
tokio::spawn(async move {
let _ = xs::trace::log_stream(store).await;
});
}
{
let store = store.clone();
tokio::spawn(async move {
if let Err(e) = xs::processor::actor::run(store).await {
eprintln!("Actor processor error: {e}");
}
});
}
let service_handle = {
let store = store.clone();
tokio::spawn(async move {
if let Err(e) = xs::processor::service::run(store).await {
eprintln!("Service processor error: {e}");
}
})
};
{
let store = store.clone();
tokio::spawn(async move {
if let Err(e) = xs::processor::action::run(store).await {
eprintln!("Action processor error: {e}");
}
});
}
tokio::select! {
res = xs::api::serve(store.clone(), engine.clone(), args.expose) => { res?; }
_ = tokio::signal::ctrl_c() => {}
}
store.append(xs::store::Frame::builder("xs.stopping").build())?;
let _ = tokio::time::timeout(Duration::from_secs(3), service_handle).await;
Ok(())
}
async fn cat(args: CommandCat) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let after = if let Some(after) = &args.after {
match scru128::Scru128Id::from_str(after) {
Ok(id) => Some(id),
Err(_) => return Err(format!("Invalid after: {after}").into()),
}
} else {
None
};
let from = if let Some(from) = &args.from {
match scru128::Scru128Id::from_str(from) {
Ok(id) => Some(id),
Err(_) => return Err(format!("Invalid from: {from}").into()),
}
} else {
None
};
let options = ReadOptions::builder()
.new(args.new)
.follow(if let Some(pulse) = args.pulse {
FollowOption::WithHeartbeat(Duration::from_millis(pulse))
} else if args.follow {
FollowOption::On
} else {
FollowOption::Off
})
.maybe_after(after)
.maybe_from(from)
.maybe_limit(args.limit.map(|l| l as usize))
.maybe_last(args.last.map(|l| l as usize))
.maybe_topic(args.topic.clone())
.build();
let mut receiver = xs::client::cat(&args.addr, options, args.sse, args.with_timestamp).await?;
let mut stdout = tokio::io::stdout();
#[cfg(unix)]
let result = {
use nix::unistd::dup;
use std::io::Write;
use std::os::unix::io::{AsRawFd, FromRawFd};
use tokio::io::unix::AsyncFd;
let stdout_fd = std::io::stdout().as_raw_fd();
let dup_fd = dup(stdout_fd)?;
let stdout_file = unsafe { std::fs::File::from_raw_fd(dup_fd) };
let async_fd = AsyncFd::new(stdout_file)?;
async {
loop {
tokio::select! {
maybe_bytes = receiver.recv() => {
match maybe_bytes {
Some(bytes) => {
if let Err(e) = stdout.write_all(&bytes).await {
if e.kind() == std::io::ErrorKind::BrokenPipe {
break;
}
return Err(e);
}
stdout.flush().await?;
}
None => break,
}
},
Ok(mut guard) = async_fd.writable() => {
let ready = guard.ready();
if ready.is_write_closed() {
break;
}
match guard.try_io(|inner| inner.get_ref().write(&[])) {
Ok(Err(e)) if e.kind() == std::io::ErrorKind::BrokenPipe => break,
Ok(Err(e)) => return Err(e), _ => {} }
guard.clear_ready_matching(ready);
}
}
}
Ok::<_, std::io::Error>(())
}
.await
};
#[cfg(not(unix))]
let result = {
async {
while let Some(bytes) = receiver.recv().await {
stdout.write_all(&bytes).await?;
stdout.flush().await?;
}
Ok::<_, std::io::Error>(())
}
.await
};
match result {
Ok(_) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => Ok(()),
Err(e) => Err(e.into()),
}
}
use std::io::IsTerminal;
use tokio::io::stdin;
use tokio::io::AsyncRead;
async fn append(args: CommandAppend) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let meta = args
.meta
.as_ref()
.map(|meta_str| serde_json::from_str(meta_str))
.transpose()?;
let ttl = match args.ttl {
Some(ref ttl_str) => Some(parse_ttl(ttl_str)?),
None => None,
};
let input: Box<dyn AsyncRead + Unpin + Send> = if !std::io::stdin().is_terminal() {
Box::new(stdin())
} else {
Box::new(tokio::io::empty())
};
let response = xs::client::append(
&args.addr,
&args.topic,
input,
meta.as_ref(),
ttl,
args.with_timestamp,
)
.await?;
tokio::io::stdout().write_all(&response).await?;
Ok(())
}
async fn cas(args: CommandCas) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let integrity = ssri::Integrity::from_str(&args.hash)?;
let mut stdout = tokio::io::stdout();
xs::client::cas_get(&args.addr, integrity, &mut stdout).await?;
stdout.flush().await?;
Ok(())
}
async fn cas_post(args: CommandCasPost) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let input: Box<dyn AsyncRead + Unpin + Send> = if !std::io::stdin().is_terminal() {
Box::new(stdin())
} else {
Box::new(tokio::io::empty())
};
let response = xs::client::cas_post(&args.addr, input).await?;
tokio::io::stdout().write_all(&response).await?;
Ok(())
}
async fn remove(args: CommandRemove) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
xs::client::remove(&args.addr, &args.id).await?;
Ok(())
}
async fn last(args: CommandLast) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (topic, count) = args.parse_args()?;
xs::client::last(
&args.addr,
topic.as_deref(),
count,
args.follow,
args.with_timestamp,
)
.await
}
async fn get(args: CommandGet) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let response = xs::client::get(&args.addr, &args.id, args.with_timestamp).await?;
tokio::io::stdout().write_all(&response).await?;
Ok(())
}
#[derive(Parser, Debug)]
struct CommandImport {
#[clap(value_parser)]
addr: String,
}
async fn import(args: CommandImport) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let input: Box<dyn AsyncRead + Unpin + Send> = if !std::io::stdin().is_terminal() {
Box::new(stdin())
} else {
Box::new(tokio::io::empty())
};
let response = xs::client::import(&args.addr, input).await?;
tokio::io::stdout().write_all(&response).await?;
Ok(())
}
#[derive(Parser, Debug)]
struct CommandVersion {
#[clap(value_parser)]
addr: String,
}
async fn version(args: CommandVersion) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let response = xs::client::version(&args.addr).await?;
println!("{}", String::from_utf8_lossy(&response));
Ok(())
}
async fn eval(args: CommandEval) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tokio::io::{stdin, AsyncReadExt};
let script = match (&args.file, &args.commands) {
(Some(_), Some(_)) => {
eprintln!("error: cannot specify both file and -c");
std::process::exit(1);
}
(None, None) => {
eprintln!("error: provide a script file or use -c");
std::process::exit(1);
}
(Some(path), None) if path == "-" => {
let mut script_content = String::new();
stdin().read_to_string(&mut script_content).await?;
script_content
}
(Some(path), None) => match tokio::fs::read_to_string(path).await {
Ok(content) => content,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
eprintln!("file not found: \"{path}\"");
eprintln!("to run an inline script, use -c:");
eprintln!(" xs eval {} -c \"{}\"", args.addr, path);
std::process::exit(1);
}
Err(e) => return Err(e.into()),
},
(None, Some(cmd)) => cmd.clone(),
};
xs::client::eval(&args.addr, script).await?;
Ok(())
}
#[derive(Parser, Debug)]
struct CommandNu {
#[clap(long)]
install: bool,
#[clap(long)]
clean: bool,
#[clap(long, value_parser)]
lib_path: Option<PathBuf>,
#[clap(long, value_parser)]
autoload_path: Option<PathBuf>,
}
#[derive(Parser, Debug)]
struct CommandScru128 {
#[clap(subcommand)]
command: Option<Scru128Command>,
}
#[derive(Subcommand, Debug)]
enum Scru128Command {
Unpack {
id: String,
},
Pack,
}
const XS_NU: &str = include_str!("../xs.nu");
fn lib_dirs() -> Vec<PathBuf> {
let mut dirs = Vec::new();
if let Some(conf) = config_dir() {
dirs.push(conf.join("nushell").join("scripts"));
}
if let Ok(extra) = std::env::var("NU_LIB_DIRS") {
dirs.extend(std::env::split_paths(&extra));
}
dirs
}
fn autoload_dirs() -> Vec<PathBuf> {
let mut dirs = Vec::new();
if let Some(conf) = config_dir() {
dirs.push(conf.join("nushell").join("vendor").join("autoload"));
}
dirs.extend(nu_vendor_autoload_dirs());
dirs
}
fn nu_vendor_autoload_dirs() -> Vec<PathBuf> {
let output = std::process::Command::new("nu")
.args(["-n", "-c", "$nu.vendor-autoload-dirs | to json"])
.output();
if let Ok(out) = output {
if out.status.success() {
if let Ok(list) = serde_json::from_slice::<Vec<String>>(&out.stdout) {
return list.into_iter().map(PathBuf::from).collect();
}
}
}
Vec::new()
}
fn ask(prompt: &str) -> bool {
eprint!("{prompt}");
let mut input = String::new();
let _ = std::io::stdin().read_line(&mut input);
matches!(input.trim(), "y" | "Y")
}
fn test_write(path: &Path) -> bool {
if let Some(parent) = path.parent() {
if std::fs::create_dir_all(parent).is_err() {
return false;
}
}
let tmp = path.with_extension("tmp");
match std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp)
{
Ok(_) => {
let _ = std::fs::remove_file(&tmp);
true
}
Err(_) => false,
}
}
fn find_paths() -> Result<(PathBuf, PathBuf), String> {
let mut xs_path = None;
let mut stub_path = None;
let lib_candidates: Vec<PathBuf> = {
let mut v = Vec::new();
if let Some(conf) = config_dir() {
v.push(conf.join("nushell").join("scripts").join("xs.nu"));
}
if let Ok(extra) = std::env::var("NU_LIB_DIRS") {
for dir in std::env::split_paths(&extra) {
let candidate = if dir.ends_with("scripts") {
dir.join("xs.nu")
} else {
dir.join("scripts").join("xs.nu")
};
v.push(candidate);
}
}
v
};
for cand in lib_candidates {
if test_write(&cand) {
xs_path = Some(cand);
break;
}
}
let auto_candidates: Vec<PathBuf> = {
let mut v = Vec::new();
for dir in nu_vendor_autoload_dirs() {
v.push(dir.join("xs-use.nu"));
}
v
};
for cand in auto_candidates {
if test_write(&cand) {
stub_path = Some(cand);
break;
}
}
match (xs_path, stub_path) {
(Some(xs), Some(stub)) => Ok((xs, stub)),
_ => Err("Could not find writable install locations".into()),
}
}
fn install(
lib_path: Option<PathBuf>,
autoload_path: Option<PathBuf>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (xs_path, stub_path) = match (lib_path, autoload_path) {
(Some(lib), Some(auto)) => (lib, auto),
(None, None) => find_paths().map_err(std::io::Error::other)?,
_ => {
return Err("Both --lib-path and --autoload-path must be provided together".into());
}
};
let targets = vec![xs_path.clone(), stub_path.clone()];
println!("will install:");
for t in &targets {
if t.exists() {
println!(" {} (overwrite)", t.display());
} else {
println!(" {}", t.display());
}
}
if !ask("Proceed? (y/N) ") {
println!("aborted");
return Ok(());
}
std::fs::create_dir_all(xs_path.parent().unwrap())?;
std::fs::write(&xs_path, XS_NU)?;
println!("installed {}", xs_path.display());
let stub_content =
"# Autogenerated by `xs nu --install`\n# Load xs's commands every session\nuse xs.nu *\n";
std::fs::create_dir_all(stub_path.parent().unwrap())?;
std::fs::write(&stub_path, stub_content)?;
println!("installed {}", stub_path.display());
Ok(())
}
fn clean() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use std::collections::BTreeSet;
let mut targets = BTreeSet::new();
for dir in lib_dirs() {
let p = dir.join("xs.nu");
if p.exists() {
targets.insert(p);
}
}
for dir in autoload_dirs() {
let p = dir.join("xs-use.nu");
if p.exists() {
targets.insert(p);
}
}
if targets.is_empty() {
println!("no installed files found");
return Ok(());
}
println!("will remove:");
for t in &targets {
println!(" {}", t.display());
}
if !ask("Proceed? (y/N) ") {
println!("aborted");
return Ok(());
}
for t in &targets {
std::fs::remove_file(t)?;
println!("removed {}", t.display());
}
Ok(())
}
fn run_nu(cmd: CommandNu) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if cmd.clean {
clean()
} else if cmd.install {
install(cmd.lib_path, cmd.autoload_path)
} else {
print!("{XS_NU}");
Ok(())
}
}
fn run_scru128(cmd: CommandScru128) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match cmd.command {
Some(Scru128Command::Unpack { id }) => {
let result = xs::scru128::unpack(&id)?;
println!("{result}");
}
Some(Scru128Command::Pack) => {
let result = xs::scru128::pack()?;
println!("{result}");
}
None => {
let result = xs::scru128::generate()?;
println!("{result}");
}
}
Ok(())
}