use std::sync::Arc;
use voidmerge::*;
fn help() {
println!(include_str!("help.txt"));
}
fn def_split_env(
args: &mut minimist::Minimist,
key: &str,
env: impl Into<std::ffi::OsString>,
) {
if let Some(val) = std::env::var_os(env.into()) {
let r = args.entry(key.into()).or_default();
for val in val.to_string_lossy().split(',') {
r.push(val.into());
}
}
}
fn arg_parse() -> Result<Arg> {
let mut args = minimist::Minimist::parse(std::env::args_os().skip(1));
let mut cmd = args
.to_one_str(minimist::Minimist::POS)
.unwrap_or_else(|| "help".into());
if args.as_flag("v") || args.as_flag("version") {
cmd = "version".into();
}
if args.as_flag("h") || args.as_flag("help") {
cmd = "help".into();
}
macro_rules! exp {
($a:ident, $t:literal) => {
$a.to_one_str($t).ok_or_else(|| {
Error::invalid(concat!(
"Argument Error: --",
$t,
" is required"
))
})?
};
}
macro_rules! exp_path {
($a:ident, $t:literal) => {
$a.as_one_path($t).ok_or_else(|| {
Error::invalid(concat!(
"Argument Error: --",
$t,
" is required"
))
})?
};
}
match cmd.as_ref() {
"help" => Ok(Arg::Help),
"version" => Ok(Arg::Version),
"serve" => {
def_split_env(&mut args, "sys-admin", "VM_SYS_ADMIN_TOKENS");
args.entry("sys-admin".into()).or_default();
args.set_default_env("http-addr", "VM_HTTP_ADDR");
args.set_default("http-addr", "[::]:8080");
args.set_default_env("store", "VM_STORE");
Ok(Arg::Serve {
sys_admin: args
.to_list_str("sys-admin")
.expect("--sys-admin is required")
.map(|s| s.into())
.collect::<Vec<_>>(),
http_addr: exp!(args, "http-addr").into(),
store: args.as_one_path("store").map(|p| p.to_owned()),
})
}
"test" => {
args.set_default_env("http-addr", "VM_HTTP_ADDR");
args.set_default("http-addr", "127.0.0.1:8080");
args.set_default_env("code-file", "VM_CODE");
args.set_default_env("code-env", "VM_ENV");
Ok(Arg::Test {
http_addr: exp!(args, "http-addr").into(),
code_file: exp_path!(args, "code-file").into(),
code_env: args.as_one_path("code-env").map(ToOwned::to_owned),
})
}
"health" => {
args.set_default_env("url", "VM_URL");
Ok(Arg::Health {
url: exp!(args, "url").into(),
})
}
"ctx-setup" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
args.set_default_env("context", "VM_CTX");
args.set_default_env("delete", "VM_DELETE");
def_split_env(&mut args, "ctx-admin", "VM_CTX_ADMIN_TOKENS");
args.entry("ctx-admin".into()).or_default();
args.set_default_env("timeout-secs", "VM_TIMEOUT_SECS");
args.set_default("timeout-secs", "10.0");
args.set_default_env("max-heap-bytes", "VM_MAX_HEAP_BYTES");
args.set_default("max-heap-bytes", "33554432");
Ok(Arg::CtxSetup {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
context: exp!(args, "context").into(),
delete: args.as_flag("delete"),
ctx_admin: args
.to_list_str("ctx-admin")
.expect("--sys-admin is required")
.map(|s| s.into())
.collect::<Vec<_>>(),
timeout_secs: exp!(args, "timeout-secs")
.parse()
.map_err(Error::other)?,
max_heap_bytes: exp!(args, "max-heap-bytes")
.parse()
.map_err(Error::other)?,
})
}
"ctx-config" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
args.set_default_env("context", "VM_CTX");
def_split_env(&mut args, "ctx-admin", "VM_CTX_ADMIN_TOKENS");
args.entry("ctx-admin".into()).or_default();
args.set_default_env("code-file", "VM_CODE");
args.set_default_env("code-env", "VM_ENV");
Ok(Arg::CtxConfig {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
context: exp!(args, "context").into(),
ctx_admin: args
.to_list_str("ctx-admin")
.expect("--sys-admin is required")
.map(|s| s.into())
.collect::<Vec<_>>(),
code_file: exp_path!(args, "code-file").into(),
code_env: args.as_one_path("code-env").map(ToOwned::to_owned),
})
}
"obj-list" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
args.set_default_env("context", "VM_CTX");
args.set_default_env("prefix", "VM_PREFIX");
args.set_default("prefix", "");
args.set_default_env("created-gt", "VM_CREATED_GT");
args.set_default("created-gt", "0.0");
args.set_default_env("limit", "VM_LIMIT");
args.set_default("limit", "4294967295");
Ok(Arg::ObjList {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
context: exp!(args, "context").into(),
prefix: exp!(args, "prefix").into(),
created_gt: exp!(args, "created-gt")
.parse()
.map_err(Error::other)?,
limit: exp!(args, "limit").parse().map_err(Error::other)?,
})
}
"obj-get" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
args.set_default_env("context", "VM_CTX");
args.set_default_env("app-path", "VM_APP_PATH");
args.set_default("app-path", "");
Ok(Arg::ObjGet {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
context: exp!(args, "context").into(),
app_path: exp!(args, "app-path").into(),
})
}
"obj-put" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
args.set_default_env("context", "VM_CTX");
args.set_default_env("app-path", "VM_APP_PATH");
args.set_default("app-path", "");
args.set_default_env("create", "VM_CREATE");
args.set_default("create", safe_now().to_string());
args.set_default_env("expire", "VM_EXPIRE");
args.set_default("expire", "0.0");
Ok(Arg::ObjPut {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
context: exp!(args, "context").into(),
app_path: exp!(args, "app-path").into(),
create: exp!(args, "create").into(),
expire: exp!(args, "expire").into(),
})
}
"obj-backup-full" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
Ok(Arg::ObjBackupFull {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
})
}
"obj-restore-full" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
Ok(Arg::ObjRestoreFull {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
})
}
"obj-backup" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
args.set_default_env("context", "VM_CTX");
args.set_default_env("created-gt", "VM_CREATED_GT");
args.set_default("created-gt", "0.0");
args.set_default_env("zip-file", "VM_ZIP_FILE");
Ok(Arg::ObjBackup {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
context: exp!(args, "context").into(),
created_gt: exp!(args, "created-gt")
.parse()
.map_err(Error::other)?,
zip_file: exp_path!(args, "zip-file").into(),
})
}
"obj-restore" => {
args.set_default_env("url", "VM_URL");
args.set_default_env("token", "VM_TOKEN");
args.set_default_env("context", "VM_CTX");
args.set_default_env("zip-file", "VM_ZIP_FILE");
Ok(Arg::ObjRestore {
url: exp!(args, "url").into(),
token: exp!(args, "token").into(),
context: exp!(args, "context").into(),
zip_file: exp_path!(args, "zip-file").into(),
})
}
unk => Err(Error::other(format!("unrecognised command: {unk}"))),
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
use opentelemetry_otlp::WithExportConfig;
use tracing_subscriber::prelude::*;
let filter_layer = tracing_subscriber::EnvFilter::builder()
.with_env_var("VM_LOG")
.with_default_directive(
tracing_subscriber::filter::LevelFilter::INFO.into(),
)
.from_env_lossy();
let fmt_layer = tracing_subscriber::fmt::layer().json();
let sub = tracing_subscriber::Registry::default()
.with(filter_layer)
.with(fmt_layer);
if std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some() {
let log_exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
.build()
.expect("initialize otel logging exporter");
let provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
.with_batch_exporter(log_exporter)
.build();
let otel_layer =
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&provider,
);
sub.with(otel_layer).init();
} else {
sub.init();
}
if std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some() {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
.build()
.expect("initialize otel metrics exporter");
let meter_provider =
opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(exporter)
.with_resource(opentelemetry_sdk::Resource::builder().build())
.build();
opentelemetry::global::set_meter_provider(meter_provider.clone());
}
voidmerge::meter::meter_init();
let arg = match arg_parse() {
Ok(arg) => arg,
Err(err) => {
eprintln!("\n-----\n{err}\n-----");
eprintln!("\n`vm --help` for additional info");
std::process::exit(1);
}
};
arg.exec().await
}
#[derive(Debug)]
enum Arg {
Help,
Version,
Serve {
sys_admin: Vec<Arc<str>>,
http_addr: String,
store: Option<std::path::PathBuf>,
},
Test {
http_addr: String,
code_file: std::path::PathBuf,
code_env: Option<std::path::PathBuf>,
},
Health {
url: String,
},
CtxSetup {
url: String,
token: Arc<str>,
context: Arc<str>,
delete: bool,
ctx_admin: Vec<Arc<str>>,
timeout_secs: f64,
max_heap_bytes: usize,
},
CtxConfig {
url: String,
token: Arc<str>,
context: Arc<str>,
ctx_admin: Vec<Arc<str>>,
code_file: std::path::PathBuf,
code_env: Option<std::path::PathBuf>,
},
ObjList {
url: String,
token: Arc<str>,
context: Arc<str>,
prefix: Arc<str>,
created_gt: f64,
limit: u32,
},
ObjGet {
url: String,
token: Arc<str>,
context: Arc<str>,
app_path: Arc<str>,
},
ObjPut {
url: String,
token: Arc<str>,
context: Arc<str>,
app_path: String,
create: String,
expire: String,
},
ObjBackupFull {
url: String,
token: Arc<str>,
},
ObjRestoreFull {
url: String,
token: Arc<str>,
},
ObjBackup {
url: String,
token: Arc<str>,
context: Arc<str>,
created_gt: f64,
zip_file: std::path::PathBuf,
},
ObjRestore {
url: String,
token: Arc<str>,
context: Arc<str>,
zip_file: std::path::PathBuf,
},
}
async fn serve(
s: tokio::sync::oneshot::Sender<std::net::SocketAddr>,
sys_admin: Vec<Arc<str>>,
http_addr: String,
store: Option<std::path::PathBuf>,
) -> Result<()> {
let http_addr: std::net::SocketAddr = http_addr.parse().map_err(|err| {
Error::other(err).with_info("failed to parse http server bind address")
})?;
let runtime = RuntimeHandle::default();
runtime.set_obj(obj::obj_file::ObjFile::create(store).await?);
runtime.set_js(js::JsExecMeter::create(js::JsExecDefault::create()));
runtime.set_msg(msg::MsgMem::create());
let server = server::Server::new(runtime).await?;
server.set_sys_admin(sys_admin).await?;
http_server::http_server(s, http_addr, server).await
}
impl Arg {
async fn exec(self) -> Result<()> {
tracing::info!(args = ?self);
match self {
Self::Help => {
help();
Ok(())
}
Self::Version => {
println!(
"{} {}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION")
);
Ok(())
}
Self::Serve {
sys_admin,
http_addr,
store,
} => {
let (s, r) = tokio::sync::oneshot::channel();
tokio::task::spawn(async move {
if let Ok(addr) = r.await {
eprintln!("#vm#listening#{addr:?}#");
}
});
serve(s, sys_admin, http_addr, store).await
}
Self::Test {
http_addr,
code_file,
code_env,
} => {
let code: Arc<str> =
tokio::fs::read_to_string(code_file).await?.into();
let code_env: serde_json::Value =
if let Some(code_env) = code_env {
serde_json::from_str(
&tokio::fs::read_to_string(code_env).await?,
)?
} else {
serde_json::Value::Null
};
let (s, r) = tokio::sync::oneshot::channel();
tokio::task::spawn(async move {
let addr = match r.await {
Ok(addr) => addr,
Err(err) => {
panic!("failed to start test server: {err:?}")
}
};
let url = format!("http://{addr:?}");
let client = voidmerge::http_client::HttpClient::new(
Default::default(),
);
let mut is_healthy = false;
for _ in 0..10 {
tokio::time::sleep(std::time::Duration::from_millis(
100,
))
.await;
if client.health(&url).await.is_ok() {
is_healthy = true;
break;
}
}
if !is_healthy {
panic!(
"failed to get healthy response from test server"
);
}
if let Err(err) = client
.ctx_setup(
&url,
"test",
crate::server::CtxSetup {
ctx: "test".into(),
delete: false,
ctx_admin: vec!["test".into()],
timeout_secs: 10.0,
max_heap_bytes: 33554432,
},
)
.await
{
panic!("failed to setup test server context: {err:?}");
}
if let Err(err) = client
.ctx_config(
&url,
"test",
crate::server::CtxConfig {
ctx: "test".into(),
ctx_admin: vec!["test".into()],
code,
code_env: code_env.into(),
},
)
.await
{
panic!("failed to setup test server context: {err:?}");
}
eprintln!("#vm#listening#{addr:?}#");
});
serve(s, vec!["test".into()], http_addr, None).await
}
Self::Health { url } => {
let client =
voidmerge::http_client::HttpClient::new(Default::default());
client.health(&url).await
}
Self::CtxSetup {
url,
token,
context,
delete,
ctx_admin,
timeout_secs,
max_heap_bytes,
} => {
let ctx_setup = crate::server::CtxSetup {
ctx: context,
delete,
ctx_admin,
timeout_secs,
max_heap_bytes,
};
let client =
voidmerge::http_client::HttpClient::new(Default::default());
client.ctx_setup(&url, &token, ctx_setup).await
}
Self::CtxConfig {
url,
token,
context,
ctx_admin,
code_file,
code_env,
} => {
let code = tokio::fs::read_to_string(code_file).await?.into();
let code_env: serde_json::Value =
if let Some(code_env) = code_env {
serde_json::from_str(
&tokio::fs::read_to_string(code_env).await?,
)?
} else {
serde_json::Value::Null
};
let ctx_config = crate::server::CtxConfig {
ctx: context,
ctx_admin,
code,
code_env: code_env.into(),
};
let client =
voidmerge::http_client::HttpClient::new(Default::default());
client.ctx_config(&url, &token, ctx_config).await
}
Self::ObjList {
url,
token,
context,
prefix,
mut created_gt,
mut limit,
} => {
let client =
voidmerge::http_client::HttpClient::new(Default::default());
let mut count = 0;
while limit > 1000 {
let next_count = std::cmp::min(1000, limit);
limit -= next_count;
let res = client
.obj_list(
&url, &context, &token, &prefix, created_gt,
next_count,
)
.await?;
if res.is_empty() {
break;
}
for r in res {
let created_secs = r.created_secs();
if created_secs > created_gt {
created_gt = created_secs;
}
count += 1;
println!("{r}");
}
}
eprintln!("#vm#list-count#{count}#");
Ok(())
}
Self::ObjGet {
url,
token,
context,
app_path,
} => {
let client =
voidmerge::http_client::HttpClient::new(Default::default());
let (meta, data) =
client.obj_get(&url, &context, &token, &app_path).await?;
eprintln!("#vm#meta#{meta}#");
use tokio::io::AsyncWriteExt;
tokio::io::stdout().write_all(&data).await?;
Ok(())
}
Self::ObjPut {
url,
token,
context,
app_path,
create,
expire,
} => {
use tokio::io::AsyncReadExt;
let mut data = Vec::new();
tokio::io::stdin().read_to_end(&mut data).await?;
let meta = crate::obj::ObjMeta(
format!("c/{context}/{app_path}/{create}/{expire}").into(),
);
let client =
voidmerge::http_client::HttpClient::new(Default::default());
let meta =
client.obj_put(&url, &token, meta, data.into()).await?;
eprintln!("#vm#meta#{meta}#");
Ok(())
}
Self::ObjBackupFull { url, token } => {
let client =
voidmerge::http_client::HttpClient::new(Default::default());
client.obj_backup_full(&url, &token).await?;
eprintln!("#vm#obj-backup-full#complete#");
Ok(())
}
Self::ObjRestoreFull { url, token } => {
let client =
voidmerge::http_client::HttpClient::new(Default::default());
client.obj_restore_full(&url, &token).await?;
eprintln!("#vm#obj-restore-full#complete#");
Ok(())
}
Self::ObjBackup {
url,
token,
context,
mut created_gt,
zip_file,
} => {
let file = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(zip_file)?;
let mut file = zip::ZipWriter::new(file);
let client =
voidmerge::http_client::HttpClient::new(Default::default());
loop {
let res = client
.obj_list(&url, &token, &context, "", created_gt, 1000)
.await?;
if res.is_empty() {
break;
}
for r in res {
let created_secs = r.created_secs();
if created_secs > created_gt {
created_gt = created_secs;
}
let (meta, data) = client
.obj_get(&url, &token, &context, r.app_path())
.await?;
println!("{meta}");
let path = meta.app_path().to_string();
#[derive(serde::Serialize)]
struct Item(crate::obj::ObjMeta, bytes::Bytes);
use voidmerge::bytes_ext::BytesExt;
let data =
bytes::Bytes::from_encode(&Item(meta, data))?;
file = tokio::task::spawn_blocking(move || {
use std::io::Write;
file.start_file(
path,
zip::write::SimpleFileOptions::default(),
)?;
file.write_all(&data)?;
std::io::Result::Ok(file)
})
.await??;
}
}
eprintln!("#vm#createdGt#{created_gt}#");
Ok(())
}
Self::ObjRestore {
url,
token,
context,
zip_file,
} => {
let file =
std::fs::OpenOptions::new().read(true).open(zip_file)?;
let mut file = zip::ZipArchive::new(file)?;
let client =
voidmerge::http_client::HttpClient::new(Default::default());
for idx in 0..file.len() {
let (tmp, meta, data) =
tokio::task::spawn_blocking(move || {
let mut out = Vec::new();
{
let mut read = file.by_index(idx)?;
use std::io::Read;
read.read_to_end(&mut out)?;
}
#[derive(serde::Deserialize)]
struct Item(crate::obj::ObjMeta, bytes::Bytes);
use voidmerge::bytes_ext::BytesExt;
let item: Item =
bytes::Bytes::from(out).to_decode()?;
Result::Ok((file, item.0, item.1))
})
.await??;
println!("{meta}");
file = tmp;
if meta.ctx() != &*context {
return Err(Error::other("context mismatch"));
}
client.obj_put(&url, &token, meta, data).await?;
}
Ok(())
}
}
}
}