use crate::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
fn p_no(s: &Arc<str>) -> bool {
s.is_empty()
}
fn timeout_secs() -> f64 {
10.0
}
fn max_heap_bytes() -> usize {
1024 * 1024 * 32
}
fn is_false(b: &bool) -> bool {
!b
}
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SysSetup {
#[serde(rename = "x", default, skip_serializing_if = "Vec::is_empty")]
pub sys_admin: Vec<Arc<str>>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CtxSetup {
#[serde(rename = "c", default, skip_serializing_if = "p_no")]
pub ctx: Arc<str>,
#[serde(rename = "d", default, skip_serializing_if = "is_false")]
pub delete: bool,
#[serde(rename = "x", default, skip_serializing_if = "Vec::is_empty")]
pub ctx_admin: Vec<Arc<str>>,
#[serde(rename = "t", default = "timeout_secs")]
pub timeout_secs: f64,
#[serde(rename = "h", default = "max_heap_bytes")]
pub max_heap_bytes: usize,
}
impl Default for CtxSetup {
fn default() -> Self {
Self {
ctx: Default::default(),
delete: false,
ctx_admin: Default::default(),
timeout_secs: timeout_secs(),
max_heap_bytes: max_heap_bytes(),
}
}
}
impl CtxSetup {
fn check(&self) -> Result<()> {
safe_str(&self.ctx)?;
for token in self.ctx_admin.iter() {
safe_str(token)?;
}
if self.max_heap_bytes < 1024 * 1024
|| self.max_heap_bytes / (1024 * 1024) > u32::MAX as usize
{
return Err(Error::other("invalid max heap bytes"));
}
Ok(())
}
}
#[derive(Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct CtxConfig {
#[serde(rename = "c", default, skip_serializing_if = "p_no")]
pub ctx: Arc<str>,
#[serde(rename = "x", default, skip_serializing_if = "Vec::is_empty")]
pub ctx_admin: Vec<Arc<str>>,
#[serde(rename = "l", default, skip_serializing_if = "p_no")]
pub code: Arc<str>,
#[serde(
rename = "e",
default,
skip_serializing_if = "serde_json::Value::is_null"
)]
pub code_env: Arc<serde_json::Value>,
}
impl std::fmt::Debug for CtxConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CtxConfig")
.field("ctx", &self.ctx)
.field("ctx_admin", &self.ctx_admin)
.field("code_bytes", &self.code.len())
.field("code_env", &self.code_env)
.finish()
}
}
impl CtxConfig {
fn check(&self) -> Result<()> {
safe_str(&self.ctx)?;
for token in self.ctx_admin.iter() {
safe_str(token)?;
}
Ok(())
}
}
pub struct Server {
runtime: RuntimeHandle,
sys_setup: Mutex<SysSetup>,
ctx_setup: Mutex<HashMap<Arc<str>, (CtxSetup, CtxConfig)>>,
ctx_map: Mutex<HashMap<Arc<str>, Arc<crate::ctx::Ctx>>>,
}
impl Server {
pub async fn new(runtime: RuntimeHandle) -> Result<Self> {
let sys_setup = runtime.runtime().obj()?.get_sys_setup().await?;
let ctx_setup = runtime.runtime().obj()?.list_ctx_all().await?;
let this = Self {
runtime,
sys_setup: Mutex::new(sys_setup),
ctx_setup: Mutex::new(ctx_setup.clone()),
ctx_map: Mutex::new(HashMap::new()),
};
for (ctx, (setup, config)) in ctx_setup {
this.setup_context(ctx, setup, config).await?;
}
Ok(this)
}
async fn setup_context(
&self,
ctx: Arc<str>,
setup: CtxSetup,
config: CtxConfig,
) -> Result<()> {
let sub = crate::ctx::Ctx::new(
ctx.clone(),
setup,
config,
self.runtime.runtime(),
)
.await?;
self.ctx_map.lock().unwrap().insert(ctx, sub);
Ok(())
}
fn get_sys_setup(&self) -> SysSetup {
self.sys_setup.lock().unwrap().clone()
}
fn get_ctx_setup(&self, ctx: &str) -> Result<(CtxSetup, CtxConfig)> {
self.ctx_setup
.lock()
.unwrap()
.get(ctx)
.cloned()
.ok_or_else(|| Error::not_found(format!("no context: {ctx}")))
}
fn check_sysadmin(&self, token: &Arc<str>) -> Result<()> {
if !self.get_sys_setup().sys_admin.contains(token) {
return Err(Error::unauthorized(
"action requires sysadmin permissions",
));
}
Ok(())
}
fn check_ctxadmin(
&self,
token: &Arc<str>,
ctx: &Arc<str>,
) -> Result<(CtxSetup, CtxConfig)> {
let (cur_setup, cur_config) = self.get_ctx_setup(ctx)?;
if !self.get_sys_setup().sys_admin.contains(token) {
if !cur_setup.ctx_admin.contains(token)
&& !cur_config.ctx_admin.contains(token)
{
return Err(Error::unauthorized(
"action requires ctxadmin permissions",
));
}
}
Ok((cur_setup, cur_config))
}
pub async fn set_sys_admin(&self, sys_admin: Vec<Arc<str>>) -> Result<()> {
for token in sys_admin.iter() {
safe_str(token)?;
}
let mut sys_setup = self.get_sys_setup();
sys_setup.sys_admin = sys_admin;
self.runtime
.runtime()
.obj()?
.set_sys_setup(sys_setup.clone())
.await?;
*self.sys_setup.lock().unwrap() = sys_setup;
Ok(())
}
pub async fn health_get(&self) -> Result<()> {
tracing::trace!(request = "health_get");
Ok(())
}
pub async fn ctx_setup_put(
&self,
token: Arc<str>,
setup: CtxSetup,
) -> Result<()> {
self.check_sysadmin(&token)?;
setup.check()?;
self.runtime
.runtime()
.obj()?
.set_ctx_setup(setup.clone())
.await?;
let (ctx, (ctx_setup, ctx_config)) = {
let ctx = setup.ctx.clone();
let mut lock = self.ctx_setup.lock().unwrap();
let r = lock.entry(ctx.clone()).or_default();
r.0 = setup;
(ctx, r.clone())
};
tracing::trace!(request = "ctx_setup", ?ctx_setup, ?ctx_config);
self.setup_context(ctx, ctx_setup, ctx_config).await?;
Ok(())
}
pub async fn ctx_config_put(
&self,
token: Arc<str>,
config: CtxConfig,
) -> Result<()> {
self.check_ctxadmin(&token, &config.ctx)?;
config.check()?;
self.runtime
.runtime()
.obj()?
.set_ctx_config(config.clone())
.await?;
let (ctx, (ctx_setup, ctx_config)) = {
let ctx = config.ctx.clone();
let mut lock = self.ctx_setup.lock().unwrap();
let r = lock.entry(ctx.clone()).or_default();
r.1 = config;
(ctx, r.clone())
};
tracing::trace!(request = "ctx_config", ?ctx_setup, ?ctx_config);
self.setup_context(ctx, ctx_setup, ctx_config).await?;
Ok(())
}
pub async fn msg_listen(
&self,
ctx: Arc<str>,
msg_id: Arc<str>,
) -> Option<crate::msg::DynMsgRecv> {
tracing::trace!(request = "msg_listen", ?ctx, ?msg_id);
self.runtime
.runtime()
.msg()
.ok()?
.get_recv(ctx, msg_id)
.await
}
pub async fn obj_backup_full(&self, token: Arc<str>) -> Result<()> {
self.check_sysadmin(&token)?;
let mut zip = tokio::task::spawn_blocking(|| {
let zip = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open("backup.zip")?;
std::io::Result::Ok(zip::ZipWriter::new(zip))
})
.await??;
let mut created_gt = 0.0;
let mut file_no = 1;
loop {
let meta_list = self
.runtime
.runtime()
.obj()?
.list("", created_gt, 200)
.await?;
if meta_list.is_empty() {
return Ok(());
}
for meta in meta_list {
created_gt = meta.created_secs();
let (meta, data) =
self.runtime.runtime().obj()?.get(meta).await?;
let meta2 = meta.clone();
zip = tokio::task::spawn_blocking(move || {
use std::io::Write;
let enc = rmp_serde::to_vec(&(meta2, data))
.map_err(std::io::Error::other)?;
zip.start_file(
file_no.to_string(),
zip::write::SimpleFileOptions::default(),
)?;
zip.write_all(&enc)?;
std::io::Result::Ok(zip)
})
.await??;
file_no += 1;
tracing::info!(%meta, "backup file");
}
}
}
pub async fn obj_restore_full(&self, token: Arc<str>) -> Result<()> {
self.check_sysadmin(&token)?;
let (mut zip, count) = tokio::task::spawn_blocking(|| {
let zip =
std::fs::OpenOptions::new().read(true).open("backup.zip")?;
let zip = zip::ZipArchive::new(zip)?;
let count = zip.len();
std::io::Result::Ok((zip, count))
})
.await??;
for idx in 0..count {
let (tmp, meta, data) = tokio::task::spawn_blocking(move || {
let mut out = Vec::new();
{
let mut read = zip.by_index(idx)?;
use std::io::Read;
read.read_to_end(&mut out)?;
}
let (meta, data): (crate::obj::ObjMeta, bytes::Bytes) =
rmp_serde::from_slice(&out)
.map_err(std::io::Error::other)?;
std::io::Result::Ok((zip, meta, data))
})
.await??;
zip = tmp;
self.runtime
.runtime()
.obj()?
.put(meta.clone(), data)
.await?;
tracing::info!(%meta, "restore file");
}
Ok(())
}
pub async fn obj_list(
&self,
token: Arc<str>,
ctx: Arc<str>,
prefix: Arc<str>,
created_gt: f64,
limit: u32,
) -> Result<Vec<crate::obj::ObjMeta>> {
self.check_ctxadmin(&token, &ctx)?;
let prefix =
format!("{}/{}/{prefix}", crate::obj::ObjMeta::SYS_CTX, ctx);
tracing::trace!(
request = "obj_list",
?ctx,
?prefix,
?created_gt,
?limit
);
let res = self
.runtime
.runtime()
.obj()?
.list(&prefix, created_gt, limit)
.await;
if let Ok(meta_list) = &res {
let sum: usize = meta_list.iter().map(|m| m.len()).sum();
crate::meter::meter_egress_byte(&ctx, sum as u128);
}
res
}
pub async fn obj_get(
&self,
token: Arc<str>,
ctx: Arc<str>,
app_path: String,
) -> Result<(crate::obj::ObjMeta, bytes::Bytes)> {
self.check_ctxadmin(&token, &ctx)?;
let meta =
crate::obj::ObjMeta::new_context(&ctx, &app_path, 0.0, 0.0, 0.0);
tracing::trace!(request = "obj_get", ?ctx, ?meta);
let res = self.runtime.runtime().obj()?.get(meta).await;
if let Ok((meta, data)) = &res {
crate::meter::meter_egress_byte(
&ctx,
(meta.len() + data.len()) as u128,
);
}
res
}
pub async fn obj_put(
&self,
token: Arc<str>,
meta: crate::obj::ObjMeta,
data: bytes::Bytes,
) -> Result<crate::obj::ObjMeta> {
let ctx: Arc<str> = meta.ctx().into();
self.check_ctxadmin(&token, &ctx)?;
let cs = meta.created_secs();
let cs = if cs < 1.0 {
safe_now().to_string()
} else {
meta.0.split('/').nth(3).unwrap_or("").to_string()
};
let meta = crate::obj::ObjMeta(
format!(
"c/{ctx}/{}/{cs}/{}/{}",
meta.app_path(),
meta.expires_secs(),
data.len(),
)
.into(),
);
tracing::trace!(request = "obj_put", ?ctx, ?meta);
let c = match self.ctx_map.lock().unwrap().get(&ctx) {
None => {
return Err(Error::not_found(format!(
"invalid context: {ctx}"
)));
}
Some(c) => c.clone(),
};
c.obj_check_req(meta.clone(), data.clone()).await?;
self.runtime
.runtime()
.obj()?
.put(meta.clone(), data)
.await?;
Ok(meta)
}
pub async fn fn_req(
&self,
ctx: Arc<str>,
req: crate::js::JsRequest,
) -> Result<crate::js::JsResponse> {
let req_id = rid();
tracing::trace!(request = "fn_req", %req_id, ?ctx, ?req);
let c = match self.ctx_map.lock().unwrap().get(&ctx) {
None => {
tracing::trace!(request = "fn_req", ?ctx, "invalid context");
return Err(Error::not_found(format!(
"invalid context: {ctx}"
)));
}
Some(c) => c.clone(),
};
let res = c.fn_req(req).await;
tracing::trace!(request = "fn_req", %req_id, ?ctx, ?res);
use crate::js::JsResponse::FnResOk;
if let Ok(FnResOk { body, headers, .. }) = &res {
let mut egress_gib = body.len();
for (k, v) in headers {
egress_gib += k.len();
egress_gib += v.len();
}
crate::meter::meter_egress_byte(&ctx, egress_gib as u128);
}
res
}
}
fn rid() -> u64 {
static I: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(1);
I.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}