use serde_json::Value;
use std::collections::HashMap;
#[cfg(target_arch = "wasm32")]
pub(crate) fn read_batch_response() -> Option<Vec<Value>> {
let len = unsafe { get_host_response_len() };
if len <= 0 {
return None;
}
let mut buf = vec![0u8; len as usize];
let read = unsafe { get_host_response(buf.as_mut_ptr() as i32, len) };
if read <= 0 {
return None;
}
buf.truncate(read as usize);
let s = String::from_utf8(buf).ok()?;
let v: Value = serde_json::from_str(&s).ok()?;
v.as_array().cloned()
}
#[cfg(target_arch = "wasm32")]
extern "C" {
#[link_name = "cufflink_log"]
fn cufflink_log(level: i32, msg_ptr: i32, msg_len: i32);
fn db_query(sql_ptr: i32, sql_len: i32) -> i32;
fn db_execute(sql_ptr: i32, sql_len: i32) -> i32;
fn get_host_response_len() -> i32;
fn get_host_response(buf_ptr: i32, buf_len: i32) -> i32;
fn nats_publish(subj_ptr: i32, subj_len: i32, payload_ptr: i32, payload_len: i32) -> i32;
fn nats_request(
subj_ptr: i32,
subj_len: i32,
payload_ptr: i32,
payload_len: i32,
timeout_ms: i32,
) -> i32;
fn http_fetch(
method_ptr: i32,
method_len: i32,
url_ptr: i32,
url_len: i32,
headers_ptr: i32,
headers_len: i32,
body_ptr: i32,
body_len: i32,
) -> i32;
fn get_config(key_ptr: i32, key_len: i32) -> i32;
fn s3_download(bucket_ptr: i32, bucket_len: i32, key_ptr: i32, key_len: i32) -> i32;
fn s3_download_many(items_ptr: i32, items_len: i32) -> i32;
fn http_fetch_many(items_ptr: i32, items_len: i32) -> i32;
fn image_transform_jpeg(
bucket_ptr: i32,
bucket_len: i32,
in_key_ptr: i32,
in_key_len: i32,
out_key_ptr: i32,
out_key_len: i32,
max_dim: i32,
quality: i32,
) -> i32;
fn s3_presign_upload(
bucket_ptr: i32,
bucket_len: i32,
key_ptr: i32,
key_len: i32,
content_type_ptr: i32,
content_type_len: i32,
expires_secs: i32,
) -> i32;
fn redis_get(key_ptr: i32, key_len: i32) -> i32;
fn redis_set(key_ptr: i32, key_len: i32, val_ptr: i32, val_len: i32, ttl_secs: i32) -> i32;
fn redis_del(key_ptr: i32, key_len: i32) -> i32;
fn redis_get_status(key_ptr: i32, key_len: i32) -> i32;
fn redis_mget(keys_ptr: i32, keys_len: i32) -> i32;
fn generate_uuid() -> i32;
fn current_time() -> i32;
fn context_tenant() -> i32;
}
#[derive(Debug, Clone)]
pub struct Auth {
pub sub: String,
pub preferred_username: Option<String>,
pub name: Option<String>,
pub email: Option<String>,
pub realm_roles: Vec<String>,
pub claims: HashMap<String, Value>,
pub permissions: Vec<String>,
pub role_names: Vec<String>,
pub is_service_account: bool,
}
impl Auth {
pub fn has_role(&self, role: &str) -> bool {
self.realm_roles.iter().any(|r| r == role)
}
pub fn can(&self, area: &str, operation: &str) -> bool {
let required = format!("{}:{}", area, operation);
let wildcard = format!("{}:*", area);
self.permissions
.iter()
.any(|p| p == &required || p == &wildcard || p == "*")
}
pub fn has_cufflink_role(&self, role: &str) -> bool {
self.role_names.iter().any(|r| r == role)
}
pub fn claim(&self, key: &str) -> Option<&Value> {
self.claims.get(key)
}
}
#[derive(Debug, Clone)]
pub struct JobContext {
pub id: String,
pub attempt: u32,
pub max_attempts: u32,
}
impl JobContext {
pub fn is_retry(&self) -> bool {
self.attempt > 1
}
}
#[derive(Debug, Clone)]
pub struct Request {
method: String,
handler: String,
headers: HashMap<String, String>,
body: Value,
raw_body: Vec<u8>,
tenant: String,
service: String,
auth: Option<Auth>,
job: Option<JobContext>,
}
impl Request {
pub fn from_json(json: &str) -> Option<Self> {
let v: Value = serde_json::from_str(json).ok()?;
let headers = v["headers"]
.as_object()
.map(|m| {
m.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
.collect()
})
.unwrap_or_default();
let auth = v["auth"].as_object().map(|auth_obj| {
let a = Value::Object(auth_obj.clone());
Auth {
sub: a["sub"].as_str().unwrap_or("").to_string(),
preferred_username: a["preferred_username"].as_str().map(|s| s.to_string()),
name: a["name"].as_str().map(|s| s.to_string()),
email: a["email"].as_str().map(|s| s.to_string()),
realm_roles: a["realm_roles"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default(),
claims: a["claims"]
.as_object()
.map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default(),
permissions: a["permissions"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default(),
role_names: a["role_names"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default(),
is_service_account: a["is_service_account"].as_bool().unwrap_or(false),
}
});
let raw_body = v["body_raw_b64"]
.as_str()
.filter(|s| !s.is_empty())
.and_then(|s| {
use base64::{engine::general_purpose, Engine};
general_purpose::STANDARD.decode(s).ok()
})
.unwrap_or_default();
let job = v["_job"].as_object().and_then(|j| {
let id = j.get("id")?.as_str()?.to_string();
let attempt = u32::try_from(j.get("attempt")?.as_u64()?).ok()?;
let max_attempts = u32::try_from(j.get("max_attempts")?.as_u64()?).ok()?;
Some(JobContext {
id,
attempt,
max_attempts,
})
});
Some(Self {
method: v["method"].as_str().unwrap_or("GET").to_string(),
handler: v["handler"].as_str().unwrap_or("").to_string(),
headers,
body: v["body"].clone(),
raw_body,
tenant: v["tenant"].as_str().unwrap_or("").to_string(),
service: v["service"].as_str().unwrap_or("").to_string(),
auth,
job,
})
}
pub fn method(&self) -> &str {
&self.method
}
pub fn handler(&self) -> &str {
&self.handler
}
pub fn headers(&self) -> &HashMap<String, String> {
&self.headers
}
pub fn header(&self, name: &str) -> Option<&str> {
self.headers.get(name).map(|s| s.as_str())
}
pub fn body(&self) -> &Value {
&self.body
}
pub fn raw_body(&self) -> &[u8] {
&self.raw_body
}
pub fn tenant(&self) -> &str {
&self.tenant
}
pub fn service(&self) -> &str {
&self.service
}
pub fn auth(&self) -> Option<&Auth> {
self.auth.as_ref()
}
pub fn job(&self) -> Option<&JobContext> {
self.job.as_ref()
}
pub fn require_auth(&self) -> Result<&Auth, Response> {
self.auth.as_ref().ok_or_else(|| {
Response::json(&serde_json::json!({
"error": "Authentication required",
"status": 401
}))
})
}
}
#[derive(Debug, Clone)]
pub struct Response {
data: String,
status: u16,
}
impl Response {
pub fn json(value: &Value) -> Self {
Self {
data: serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()),
status: 200,
}
}
pub fn text(s: &str) -> Self {
Self::json(&Value::String(s.to_string()))
}
pub fn error(message: &str) -> Self {
Self {
data: serde_json::json!({"error": message}).to_string(),
status: 400,
}
}
pub fn not_found(message: &str) -> Self {
Self {
data: serde_json::json!({"error": message}).to_string(),
status: 404,
}
}
pub fn forbidden(message: &str) -> Self {
Self {
data: serde_json::json!({"error": message}).to_string(),
status: 403,
}
}
pub fn empty() -> Self {
Self::json(&serde_json::json!({"ok": true}))
}
pub fn with_status(mut self, status: u16) -> Self {
self.status = status;
self
}
pub fn into_data(self) -> String {
if self.status == 200 {
self.data
} else {
serde_json::json!({
"__status": self.status,
"__body": serde_json::from_str::<Value>(&self.data).unwrap_or(Value::String(self.data)),
})
.to_string()
}
}
}
pub mod db {
use super::*;
pub fn query(sql: &str) -> Vec<Value> {
#[cfg(target_arch = "wasm32")]
{
let bytes = sql.as_bytes();
let result = unsafe { db_query(bytes.as_ptr() as i32, bytes.len() as i32) };
if result < 0 {
return vec![];
}
read_host_response()
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = sql;
vec![]
}
}
pub fn query_one(sql: &str) -> Option<Value> {
query(sql).into_iter().next()
}
pub fn execute(sql: &str) -> i32 {
#[cfg(target_arch = "wasm32")]
{
let bytes = sql.as_bytes();
unsafe { db_execute(bytes.as_ptr() as i32, bytes.len() as i32) }
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = sql;
0
}
}
#[cfg(target_arch = "wasm32")]
fn read_host_response() -> Vec<Value> {
let len = unsafe { get_host_response_len() };
if len <= 0 {
return vec![];
}
let mut buf = vec![0u8; len as usize];
let read = unsafe { get_host_response(buf.as_mut_ptr() as i32, len) };
if read <= 0 {
return vec![];
}
buf.truncate(read as usize);
let json_str = String::from_utf8_lossy(&buf);
serde_json::from_str(&json_str).unwrap_or_default()
}
}
pub mod nats {
#[allow(unused_imports)]
use super::*;
pub fn publish(subject: &str, payload: &str) -> bool {
#[cfg(target_arch = "wasm32")]
{
let subj_bytes = subject.as_bytes();
let payload_bytes = payload.as_bytes();
let result = unsafe {
nats_publish(
subj_bytes.as_ptr() as i32,
subj_bytes.len() as i32,
payload_bytes.as_ptr() as i32,
payload_bytes.len() as i32,
)
};
result == 0
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (subject, payload);
true
}
}
pub fn request(subject: &str, payload: &str, timeout_ms: i32) -> Option<String> {
#[cfg(target_arch = "wasm32")]
{
let subj_bytes = subject.as_bytes();
let payload_bytes = payload.as_bytes();
let result = unsafe {
nats_request(
subj_bytes.as_ptr() as i32,
subj_bytes.len() as i32,
payload_bytes.as_ptr() as i32,
payload_bytes.len() as i32,
timeout_ms,
)
};
if result != 0 {
return None;
}
let len = unsafe { get_host_response_len() };
if len <= 0 {
return None;
}
let mut buf = vec![0u8; len as usize];
let read = unsafe { get_host_response(buf.as_mut_ptr() as i32, len) };
if read <= 0 {
return None;
}
String::from_utf8(buf[..read as usize].to_vec()).ok()
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (subject, payload, timeout_ms);
None
}
}
}
pub mod log {
#[allow(unused_imports)]
use super::*;
pub fn error(msg: &str) {
write(0, msg);
}
pub fn warn(msg: &str) {
write(1, msg);
}
pub fn info(msg: &str) {
write(2, msg);
}
pub fn debug(msg: &str) {
write(3, msg);
}
fn write(level: i32, msg: &str) {
#[cfg(target_arch = "wasm32")]
{
let bytes = msg.as_bytes();
unsafe {
super::cufflink_log(level, bytes.as_ptr() as i32, bytes.len() as i32);
}
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (level, msg);
}
}
}
pub mod http {
#[allow(unused_imports)]
use super::*;
#[derive(Debug, Clone)]
pub struct FetchResponse {
pub status: i32,
pub body: String,
pub body_encoding: String,
pub headers: HashMap<String, String>,
}
impl FetchResponse {
pub fn json(&self) -> Option<Value> {
serde_json::from_str(&self.body).ok()
}
pub fn is_success(&self) -> bool {
(200..300).contains(&self.status)
}
pub fn is_base64(&self) -> bool {
self.body_encoding == "base64"
}
}
pub fn fetch(
method: &str,
url: &str,
headers: &[(&str, &str)],
body: Option<&str>,
) -> Option<FetchResponse> {
#[cfg(target_arch = "wasm32")]
{
let method_bytes = method.as_bytes();
let url_bytes = url.as_bytes();
let headers_map: HashMap<&str, &str> = headers.iter().copied().collect();
let headers_json = serde_json::to_string(&headers_map).unwrap_or_default();
let headers_bytes = headers_json.as_bytes();
let body_bytes = body.unwrap_or("").as_bytes();
let body_len = body.map(|b| b.len()).unwrap_or(0);
let result = unsafe {
http_fetch(
method_bytes.as_ptr() as i32,
method_bytes.len() as i32,
url_bytes.as_ptr() as i32,
url_bytes.len() as i32,
headers_bytes.as_ptr() as i32,
headers_bytes.len() as i32,
body_bytes.as_ptr() as i32,
body_len as i32,
)
};
if result < 0 {
return None;
}
read_fetch_response()
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (method, url, headers, body);
None
}
}
pub fn get(url: &str, headers: &[(&str, &str)]) -> Option<FetchResponse> {
fetch("GET", url, headers, None)
}
pub fn post(url: &str, headers: &[(&str, &str)], body: &str) -> Option<FetchResponse> {
fetch("POST", url, headers, Some(body))
}
pub fn put(url: &str, headers: &[(&str, &str)], body: &str) -> Option<FetchResponse> {
fetch("PUT", url, headers, Some(body))
}
pub fn delete(url: &str, headers: &[(&str, &str)]) -> Option<FetchResponse> {
fetch("DELETE", url, headers, None)
}
pub fn patch(url: &str, headers: &[(&str, &str)], body: &str) -> Option<FetchResponse> {
fetch("PATCH", url, headers, Some(body))
}
#[derive(Debug, Clone)]
pub struct FetchRequest<'a> {
pub method: &'a str,
pub url: &'a str,
pub headers: &'a [(&'a str, &'a str)],
pub body: Option<&'a str>,
}
pub fn fetch_many(requests: &[FetchRequest<'_>]) -> Vec<Result<FetchResponse, String>> {
#[cfg(target_arch = "wasm32")]
{
let items: Vec<Value> = requests
.iter()
.map(|r| {
let h: HashMap<&str, &str> = r.headers.iter().copied().collect();
serde_json::json!({
"method": r.method,
"url": r.url,
"headers": h,
"body": r.body,
})
})
.collect();
let payload = serde_json::to_string(&items).unwrap_or_else(|_| "[]".to_string());
let bytes = payload.as_bytes();
let rc = unsafe { super::http_fetch_many(bytes.as_ptr() as i32, bytes.len() as i32) };
if rc < 0 {
return requests
.iter()
.map(|_| Err("http_fetch_many host call failed".to_string()))
.collect();
}
super::read_batch_response()
.map(|v| v.into_iter().map(parse_fetch_slot).collect())
.unwrap_or_else(|| {
requests
.iter()
.map(|_| Err("malformed host response".to_string()))
.collect()
})
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = requests;
vec![]
}
}
#[cfg(target_arch = "wasm32")]
fn fetch_response_from_json(v: &Value) -> FetchResponse {
FetchResponse {
status: v["status"].as_i64().unwrap_or(0) as i32,
body: v["body"].as_str().unwrap_or("").to_string(),
body_encoding: v["body_encoding"].as_str().unwrap_or("utf8").to_string(),
headers: v["headers"]
.as_object()
.map(|m| {
m.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
.collect()
})
.unwrap_or_default(),
}
}
#[cfg(target_arch = "wasm32")]
fn parse_fetch_slot(slot: Value) -> Result<FetchResponse, String> {
if !slot["ok"].as_bool().unwrap_or(false) {
return Err(slot["error"]
.as_str()
.unwrap_or("unknown error")
.to_string());
}
Ok(fetch_response_from_json(&slot))
}
#[cfg(target_arch = "wasm32")]
fn read_fetch_response() -> Option<FetchResponse> {
let len = unsafe { get_host_response_len() };
if len <= 0 {
return None;
}
let mut buf = vec![0u8; len as usize];
let read = unsafe { get_host_response(buf.as_mut_ptr() as i32, len) };
if read <= 0 {
return None;
}
buf.truncate(read as usize);
let json_str = String::from_utf8_lossy(&buf);
let v: Value = serde_json::from_str(&json_str).ok()?;
Some(fetch_response_from_json(&v))
}
}
pub mod config {
#[allow(unused_imports)]
use super::*;
pub fn get(key: &str) -> Option<String> {
#[cfg(target_arch = "wasm32")]
{
let bytes = key.as_bytes();
let result = unsafe { get_config(bytes.as_ptr() as i32, bytes.len() as i32) };
if result < 0 {
return None;
}
read_config_response()
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = key;
None
}
}
#[cfg(target_arch = "wasm32")]
fn read_config_response() -> Option<String> {
let len = unsafe { get_host_response_len() };
if len <= 0 {
return None;
}
let mut buf = vec![0u8; len as usize];
let read = unsafe { get_host_response(buf.as_mut_ptr() as i32, len) };
if read <= 0 {
return None;
}
buf.truncate(read as usize);
String::from_utf8(buf).ok()
}
}
pub mod storage {
#[allow(unused_imports)]
use super::*;
pub fn download(bucket: &str, key: &str) -> Option<String> {
#[cfg(target_arch = "wasm32")]
{
let bucket_bytes = bucket.as_bytes();
let key_bytes = key.as_bytes();
let result = unsafe {
s3_download(
bucket_bytes.as_ptr() as i32,
bucket_bytes.len() as i32,
key_bytes.as_ptr() as i32,
key_bytes.len() as i32,
)
};
if result < 0 {
return None;
}
read_storage_response()
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (bucket, key);
None
}
}
pub fn presign_upload(
bucket: &str,
key: &str,
content_type: &str,
expires_secs: u64,
) -> Option<String> {
#[cfg(target_arch = "wasm32")]
{
let bucket_bytes = bucket.as_bytes();
let key_bytes = key.as_bytes();
let ct_bytes = content_type.as_bytes();
let result = unsafe {
s3_presign_upload(
bucket_bytes.as_ptr() as i32,
bucket_bytes.len() as i32,
key_bytes.as_ptr() as i32,
key_bytes.len() as i32,
ct_bytes.as_ptr() as i32,
ct_bytes.len() as i32,
expires_secs as i32,
)
};
if result < 0 {
return None;
}
read_storage_response()
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (bucket, key, content_type, expires_secs);
None
}
}
#[cfg(target_arch = "wasm32")]
fn read_storage_response() -> Option<String> {
let len = unsafe { get_host_response_len() };
if len <= 0 {
return None;
}
let mut buf = vec![0u8; len as usize];
let read = unsafe { get_host_response(buf.as_mut_ptr() as i32, len) };
if read <= 0 {
return None;
}
buf.truncate(read as usize);
String::from_utf8(buf).ok()
}
pub fn download_many(items: &[(&str, &str)]) -> Vec<Result<Vec<u8>, String>> {
#[cfg(target_arch = "wasm32")]
{
let payload_items: Vec<Value> = items
.iter()
.map(|(b, k)| serde_json::json!({ "bucket": b, "key": k }))
.collect();
let payload = serde_json::to_string(&payload_items).unwrap_or_else(|_| "[]".into());
let bytes = payload.as_bytes();
let rc = unsafe { super::s3_download_many(bytes.as_ptr() as i32, bytes.len() as i32) };
if rc < 0 {
return items
.iter()
.map(|_| Err("s3_download_many host call failed".to_string()))
.collect();
}
super::read_batch_response()
.map(|v| v.into_iter().map(parse_download_slot).collect())
.unwrap_or_else(|| {
items
.iter()
.map(|_| Err("malformed host response".to_string()))
.collect()
})
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = items;
vec![]
}
}
#[cfg(target_arch = "wasm32")]
fn parse_download_slot(slot: Value) -> Result<Vec<u8>, String> {
if !slot["ok"].as_bool().unwrap_or(false) {
return Err(slot["error"]
.as_str()
.unwrap_or("unknown error")
.to_string());
}
let b64 = slot["data_b64"]
.as_str()
.ok_or_else(|| "missing data_b64".to_string())?;
use base64::{engine::general_purpose, Engine};
general_purpose::STANDARD
.decode(b64)
.map_err(|e| format!("base64 decode failed: {}", e))
}
}
pub mod image {
#[allow(unused_imports)]
use super::*;
pub fn transform_jpeg(
bucket: &str,
in_key: &str,
out_key: &str,
max_dim: u32,
quality: u8,
) -> Option<u32> {
#[cfg(target_arch = "wasm32")]
{
let bucket_bytes = bucket.as_bytes();
let in_key_bytes = in_key.as_bytes();
let out_key_bytes = out_key.as_bytes();
let result = unsafe {
super::image_transform_jpeg(
bucket_bytes.as_ptr() as i32,
bucket_bytes.len() as i32,
in_key_bytes.as_ptr() as i32,
in_key_bytes.len() as i32,
out_key_bytes.as_ptr() as i32,
out_key_bytes.len() as i32,
max_dim as i32,
quality as i32,
)
};
if result < 0 {
return None;
}
Some(result as u32)
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (bucket, in_key, out_key, max_dim, quality);
None
}
}
}
pub mod redis {
#[allow(unused_imports)]
use super::*;
pub fn get(key: &str) -> Option<String> {
#[cfg(target_arch = "wasm32")]
{
let bytes = key.as_bytes();
let result = unsafe { redis_get(bytes.as_ptr() as i32, bytes.len() as i32) };
if result < 0 {
return None;
}
read_redis_response()
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = key;
None
}
}
pub fn set(key: &str, value: &str, ttl_secs: i32) -> bool {
#[cfg(target_arch = "wasm32")]
{
let key_bytes = key.as_bytes();
let val_bytes = value.as_bytes();
let result = unsafe {
redis_set(
key_bytes.as_ptr() as i32,
key_bytes.len() as i32,
val_bytes.as_ptr() as i32,
val_bytes.len() as i32,
ttl_secs,
)
};
result == 0
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (key, value, ttl_secs);
true
}
}
pub fn del(key: &str) -> bool {
#[cfg(target_arch = "wasm32")]
{
let bytes = key.as_bytes();
let result = unsafe { redis_del(bytes.as_ptr() as i32, bytes.len() as i32) };
result == 0
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = key;
true
}
}
#[cfg(target_arch = "wasm32")]
fn read_redis_response() -> Option<String> {
let len = unsafe { get_host_response_len() };
if len <= 0 {
return None;
}
let mut buf = vec![0u8; len as usize];
let read = unsafe { get_host_response(buf.as_mut_ptr() as i32, len) };
if read <= 0 {
return None;
}
buf.truncate(read as usize);
String::from_utf8(buf).ok()
}
pub fn get_with_status(key: &str) -> Result<Option<String>, String> {
#[cfg(target_arch = "wasm32")]
{
let bytes = key.as_bytes();
let code =
unsafe { super::redis_get_status(bytes.as_ptr() as i32, bytes.len() as i32) };
if code == 1 {
return Ok(None);
}
let body = super::read_host_response_string();
if code < 0 {
Err(err_from_response(body))
} else {
Ok(Some(body))
}
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = key;
Ok(None)
}
}
pub fn mget(keys: &[&str]) -> Result<Vec<Option<String>>, String> {
if keys.is_empty() {
return Ok(Vec::new());
}
#[cfg(target_arch = "wasm32")]
{
let payload = serde_json::to_string(keys).map_err(|e| e.to_string())?;
let bytes = payload.as_bytes();
let code = unsafe { super::redis_mget(bytes.as_ptr() as i32, bytes.len() as i32) };
let response = super::read_host_response_string();
if code < 0 {
return Err(err_from_response(response));
}
serde_json::from_str(&response).map_err(|e| e.to_string())
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(vec![None; keys.len()])
}
}
#[cfg(target_arch = "wasm32")]
fn err_from_response(body: String) -> String {
if body.is_empty() {
"redis error".to_string()
} else {
body
}
}
}
#[cfg(target_arch = "wasm32")]
fn read_host_response_string() -> String {
let len = unsafe { get_host_response_len() };
if len <= 0 {
return String::new();
}
let mut buf = vec![0u8; len as usize];
let read = unsafe { get_host_response(buf.as_mut_ptr() as i32, len) };
if read <= 0 {
return String::new();
}
buf.truncate(read as usize);
String::from_utf8(buf).unwrap_or_default()
}
pub mod util {
#[allow(unused_imports)]
use super::*;
pub fn current_time() -> String {
#[cfg(target_arch = "wasm32")]
{
if unsafe { super::current_time() } < 0 {
return String::new();
}
super::read_host_response_string()
}
#[cfg(not(target_arch = "wasm32"))]
{
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
format!("1970-01-01T00:00:00Z+{}", secs)
}
}
pub fn generate_uuid() -> String {
#[cfg(target_arch = "wasm32")]
{
if unsafe { super::generate_uuid() } < 0 {
return String::new();
}
super::read_host_response_string()
}
#[cfg(not(target_arch = "wasm32"))]
{
format!(
"{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u32)
.unwrap_or(0),
std::process::id() as u16,
0u16,
0x8000u16,
0u64,
)
}
}
}
pub mod context {
#[allow(unused_imports)]
use super::*;
pub fn tenant() -> String {
#[cfg(target_arch = "wasm32")]
{
if unsafe { super::context_tenant() } != 0 {
return String::new();
}
super::read_host_response_string()
}
#[cfg(not(target_arch = "wasm32"))]
{
String::new()
}
}
}
#[doc(hidden)]
pub fn __run_handler<F>(ptr: i32, len: i32, f: F) -> i32
where
F: FnOnce(Request) -> Response,
{
let request_json = unsafe {
let slice = std::slice::from_raw_parts(ptr as *const u8, len as usize);
String::from_utf8_lossy(slice).into_owned()
};
let request = Request::from_json(&request_json).unwrap_or_else(|| Request {
method: "GET".to_string(),
handler: String::new(),
headers: HashMap::new(),
body: Value::Null,
raw_body: Vec::new(),
tenant: String::new(),
service: String::new(),
auth: None,
job: None,
});
let response = f(request);
let response_bytes = response.into_data().into_bytes();
let total = 4 + response_bytes.len();
let layout = std::alloc::Layout::from_size_align(total, 1).expect("invalid layout");
let out_ptr = unsafe { std::alloc::alloc(layout) };
unsafe {
let len_bytes = (response_bytes.len() as u32).to_le_bytes();
std::ptr::copy_nonoverlapping(len_bytes.as_ptr(), out_ptr, 4);
std::ptr::copy_nonoverlapping(
response_bytes.as_ptr(),
out_ptr.add(4),
response_bytes.len(),
);
}
out_ptr as i32
}
#[macro_export]
macro_rules! init {
() => {
#[no_mangle]
pub extern "C" fn alloc(size: i32) -> i32 {
let layout = std::alloc::Layout::from_size_align(size as usize, 1).unwrap();
unsafe { std::alloc::alloc(layout) as i32 }
}
};
}
#[macro_export]
macro_rules! handler {
($name:ident, |$req:ident : Request| $body:expr) => {
#[no_mangle]
pub extern "C" fn $name(ptr: i32, len: i32) -> i32 {
$crate::__run_handler(ptr, len, |$req: $crate::Request| $body)
}
};
}
pub mod migrate {
use super::{Request, Response};
pub use cufflink_types::SchemaDiff;
pub fn run<F>(req: Request, handler: F) -> Response
where
F: FnOnce(SchemaDiff) -> Result<(), String>,
{
match serde_json::from_value::<SchemaDiff>(req.body().clone()) {
Ok(diff) => match handler(diff) {
Ok(()) => Response::json(&serde_json::json!({"ok": true})),
Err(e) => Response::error(&e),
},
Err(e) => Response::error(&format!(
"on_migrate: failed to parse SchemaDiff payload: {}",
e
)),
}
}
}
pub mod prelude {
pub use crate::config;
pub use crate::db;
pub use crate::http;
pub use crate::image;
pub use crate::log;
pub use crate::migrate;
pub use crate::nats;
pub use crate::redis;
pub use crate::storage;
pub use crate::util;
pub use crate::Auth;
pub use crate::JobContext;
pub use crate::Request;
pub use crate::Response;
pub use serde_json::{json, Value};
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_request_parsing() {
let json = serde_json::to_string(&json!({
"method": "POST",
"handler": "checkout",
"headers": {"content-type": "application/json"},
"body": {"item": "widget", "qty": 3},
"tenant": "acme",
"service": "shop"
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
assert_eq!(req.method(), "POST");
assert_eq!(req.handler(), "checkout");
assert_eq!(req.tenant(), "acme");
assert_eq!(req.service(), "shop");
assert_eq!(req.body()["item"], "widget");
assert_eq!(req.body()["qty"], 3);
assert_eq!(req.header("content-type"), Some("application/json"));
}
#[test]
fn test_request_missing_fields() {
let json = r#"{"method": "GET"}"#;
let req = Request::from_json(json).unwrap();
assert_eq!(req.method(), "GET");
assert_eq!(req.handler(), "");
assert_eq!(req.tenant(), "");
assert_eq!(req.body(), &Value::Null);
assert!(req.raw_body().is_empty());
}
#[test]
fn test_request_raw_body_round_trip() {
use base64::{engine::general_purpose, Engine};
let raw = br#"{"type":"message.delivered","data":{"object":{"id":"AC1"}}}"#;
let json = serde_json::to_string(&json!({
"method": "POST",
"handler": "webhook",
"headers": {"content-type": "application/json"},
"body": serde_json::from_slice::<Value>(raw).unwrap(),
"body_raw_b64": general_purpose::STANDARD.encode(raw),
"tenant": "acme",
"service": "shop",
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
assert_eq!(req.raw_body(), raw);
}
#[test]
fn test_request_raw_body_invalid_base64_yields_empty() {
let json = serde_json::to_string(&json!({
"method": "POST",
"handler": "webhook",
"body": Value::Null,
"body_raw_b64": "not%base64!",
"tenant": "acme",
"service": "shop",
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
assert!(req.raw_body().is_empty());
}
#[test]
fn test_request_job_context_parsed_when_present() {
let json = serde_json::to_string(&json!({
"method": "POST",
"handler": "handle_run_ai",
"body": {"id": "item-1"},
"tenant": "acme",
"service": "asset",
"_job": {
"id": "11111111-1111-1111-1111-111111111111",
"attempt": 2,
"max_attempts": 3,
},
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
let job = req.job().expect("job context should be parsed");
assert_eq!(job.id, "11111111-1111-1111-1111-111111111111");
assert_eq!(job.attempt, 2);
assert_eq!(job.max_attempts, 3);
assert!(job.is_retry());
}
#[test]
fn test_request_job_context_absent_on_http_direct() {
let json = serde_json::to_string(&json!({
"method": "GET",
"handler": "list",
"body": Value::Null,
"tenant": "acme",
"service": "asset",
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
assert!(req.job().is_none());
}
#[test]
fn test_request_job_context_first_attempt_is_not_retry() {
let json = serde_json::to_string(&json!({
"method": "POST",
"handler": "handle_run_ai",
"body": Value::Null,
"tenant": "acme",
"service": "asset",
"_job": {
"id": "22222222-2222-2222-2222-222222222222",
"attempt": 1,
"max_attempts": 2,
},
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
let job = req.job().expect("job context should be parsed");
assert_eq!(job.attempt, 1);
assert!(!job.is_retry());
}
#[test]
fn test_request_job_context_rejects_malformed_envelope() {
let json = serde_json::to_string(&json!({
"method": "POST",
"handler": "handle_run_ai",
"body": Value::Null,
"tenant": "acme",
"service": "asset",
"_job": {
"id": "33333333-3333-3333-3333-333333333333",
"max_attempts": 2,
},
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
assert!(
req.job().is_none(),
"missing attempt should yield None, not a partial JobContext"
);
}
#[test]
fn test_response_json() {
let resp = Response::json(&json!({"status": "ok", "count": 42}));
let data = resp.into_data();
let parsed: Value = serde_json::from_str(&data).unwrap();
assert_eq!(parsed["status"], "ok");
assert_eq!(parsed["count"], 42);
}
#[test]
fn test_response_error() {
let resp = Response::error("something went wrong");
let data = resp.into_data();
let parsed: Value = serde_json::from_str(&data).unwrap();
assert_eq!(parsed["__status"], 400);
assert_eq!(parsed["__body"]["error"], "something went wrong");
}
#[test]
fn test_response_not_found() {
let resp = Response::not_found("item not found");
let data = resp.into_data();
let parsed: Value = serde_json::from_str(&data).unwrap();
assert_eq!(parsed["__status"], 404);
assert_eq!(parsed["__body"]["error"], "item not found");
}
#[test]
fn test_response_with_status() {
let resp = Response::json(&serde_json::json!({"ok": true})).with_status(201);
let data = resp.into_data();
let parsed: Value = serde_json::from_str(&data).unwrap();
assert_eq!(parsed["__status"], 201);
assert_eq!(parsed["__body"]["ok"], true);
}
fn migrate_request(diff: serde_json::Value) -> Request {
let payload = serde_json::to_string(&json!({
"method": "POST",
"handler": "handle_on_migrate",
"headers": {},
"body": diff,
"tenant": "default",
"service": "logistics-service",
}))
.unwrap();
Request::from_json(&payload).unwrap()
}
#[test]
fn test_migrate_run_success() {
let req = migrate_request(json!({
"added_columns": [["pickups", "min"]],
"dropped_columns": [["pickups", "midpoint"]],
}));
let resp = migrate::run(req, |diff| {
assert!(diff.added_column("pickups", "min"));
assert!(diff.dropped_column("pickups", "midpoint"));
Ok(())
});
let parsed: Value = serde_json::from_str(&resp.into_data()).unwrap();
assert_eq!(parsed["ok"], true);
}
#[test]
fn test_migrate_run_handler_error() {
let req = migrate_request(json!({}));
let resp = migrate::run(req, |_| Err("backfill failed".into()));
let parsed: Value = serde_json::from_str(&resp.into_data()).unwrap();
assert_eq!(parsed["__status"], 400);
assert_eq!(parsed["__body"]["error"], "backfill failed");
}
#[test]
fn test_migrate_run_invalid_payload() {
let req = migrate_request(json!("not a diff"));
let resp = migrate::run(req, |_| {
panic!("closure should not be called for invalid payload")
});
let parsed: Value = serde_json::from_str(&resp.into_data()).unwrap();
assert_eq!(parsed["__status"], 400);
assert!(parsed["__body"]["error"]
.as_str()
.unwrap()
.contains("on_migrate: failed to parse SchemaDiff payload"));
}
#[test]
fn test_migrate_run_empty_diff() {
let req = migrate_request(json!({}));
let mut called = false;
let resp = migrate::run(req, |diff| {
assert!(diff.is_empty());
called = true;
Ok(())
});
assert!(called);
let parsed: Value = serde_json::from_str(&resp.into_data()).unwrap();
assert_eq!(parsed["ok"], true);
}
#[test]
fn test_response_200_no_wrapper() {
let resp = Response::json(&serde_json::json!({"data": "test"}));
let data = resp.into_data();
let parsed: Value = serde_json::from_str(&data).unwrap();
assert_eq!(parsed["data"], "test");
assert!(parsed.get("__status").is_none());
}
#[test]
fn test_response_empty() {
let resp = Response::empty();
let data = resp.into_data();
let parsed: Value = serde_json::from_str(&data).unwrap();
assert_eq!(parsed["ok"], true);
}
#[test]
fn test_response_text() {
let resp = Response::text("hello world");
let data = resp.into_data();
let parsed: Value = serde_json::from_str(&data).unwrap();
assert_eq!(parsed, "hello world");
}
#[test]
fn test_db_query_noop_on_native() {
let rows = db::query("SELECT 1");
assert!(rows.is_empty());
}
#[test]
fn test_db_query_one_noop_on_native() {
let row = db::query_one("SELECT 1");
assert!(row.is_none());
}
#[test]
fn test_db_execute_noop_on_native() {
let affected = db::execute("INSERT INTO x VALUES (1)");
assert_eq!(affected, 0);
}
#[test]
fn test_nats_publish_noop_on_native() {
let ok = nats::publish("test.subject", "payload");
assert!(ok);
}
#[test]
fn test_request_with_auth() {
let json = serde_json::to_string(&json!({
"method": "POST",
"handler": "checkout",
"headers": {},
"body": {},
"tenant": "acme",
"service": "shop",
"auth": {
"sub": "user-123",
"preferred_username": "john",
"name": "John Doe",
"email": "john@example.com",
"realm_roles": ["admin", "manager"],
"claims": {"department": "engineering"}
}
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
let auth = req.auth().unwrap();
assert_eq!(auth.sub, "user-123");
assert_eq!(auth.preferred_username.as_deref(), Some("john"));
assert_eq!(auth.name.as_deref(), Some("John Doe"));
assert_eq!(auth.email.as_deref(), Some("john@example.com"));
assert!(auth.has_role("admin"));
assert!(auth.has_role("manager"));
assert!(!auth.has_role("viewer"));
assert_eq!(
auth.claim("department").and_then(|v| v.as_str()),
Some("engineering")
);
}
#[test]
fn test_request_without_auth() {
let json = r#"{"method": "GET"}"#;
let req = Request::from_json(json).unwrap();
assert!(req.auth().is_none());
}
#[test]
fn test_request_null_auth() {
let json = serde_json::to_string(&json!({
"method": "GET",
"auth": null
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
assert!(req.auth().is_none());
}
#[test]
fn test_require_auth_success() {
let json = serde_json::to_string(&json!({
"method": "GET",
"auth": {"sub": "user-1", "realm_roles": [], "claims": {}}
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
assert!(req.require_auth().is_ok());
assert_eq!(req.require_auth().unwrap().sub, "user-1");
}
#[test]
fn test_require_auth_fails_when_unauthenticated() {
let json = r#"{"method": "GET"}"#;
let req = Request::from_json(json).unwrap();
assert!(req.require_auth().is_err());
}
#[test]
fn test_http_fetch_noop_on_native() {
let resp = http::fetch("GET", "https://example.com", &[], None);
assert!(resp.is_none());
}
#[test]
fn test_http_get_noop_on_native() {
let resp = http::get("https://example.com", &[]);
assert!(resp.is_none());
}
#[test]
fn test_http_post_noop_on_native() {
let resp = http::post("https://example.com", &[], "{}");
assert!(resp.is_none());
}
#[test]
fn test_storage_download_noop_on_native() {
let data = storage::download("my-bucket", "images/photo.jpg");
assert!(data.is_none());
}
#[test]
fn test_image_transform_jpeg_noop_on_native() {
let bytes = image::transform_jpeg("my-bucket", "in.jpg", "out.jpg", 1024, 80);
assert!(bytes.is_none());
}
#[test]
fn test_auth_permissions() {
let json = serde_json::to_string(&json!({
"method": "POST",
"handler": "test",
"headers": {},
"body": {},
"tenant": "acme",
"service": "shop",
"auth": {
"sub": "user-1",
"realm_roles": ["admin"],
"claims": {},
"permissions": ["staff:create", "staff:view", "items:*"],
"role_names": ["admin", "manager"]
}
}))
.unwrap();
let req = Request::from_json(&json).unwrap();
let auth = req.auth().unwrap();
assert!(auth.can("staff", "create"));
assert!(auth.can("staff", "view"));
assert!(!auth.can("staff", "delete"));
assert!(auth.can("items", "create"));
assert!(auth.can("items", "view"));
assert!(auth.can("items", "delete"));
assert!(!auth.can("batches", "view"));
assert!(auth.has_cufflink_role("admin"));
assert!(auth.has_cufflink_role("manager"));
assert!(!auth.has_cufflink_role("viewer"));
}
#[test]
fn test_auth_super_wildcard() {
let auth = Auth {
sub: "user-1".to_string(),
preferred_username: None,
name: None,
email: None,
realm_roles: vec![],
claims: HashMap::new(),
permissions: vec!["*".to_string()],
role_names: vec!["superadmin".to_string()],
is_service_account: false,
};
assert!(auth.can("anything", "everything"));
assert!(auth.can("staff", "create"));
}
#[test]
fn test_auth_empty_permissions() {
let auth = Auth {
sub: "user-1".to_string(),
preferred_username: None,
name: None,
email: None,
realm_roles: vec![],
claims: HashMap::new(),
permissions: vec![],
role_names: vec![],
is_service_account: false,
};
assert!(!auth.can("staff", "create"));
assert!(!auth.has_cufflink_role("admin"));
}
#[test]
fn test_redis_get_noop_on_native() {
let val = redis::get("some-key");
assert!(val.is_none());
}
#[test]
fn test_redis_set_noop_on_native() {
let ok = redis::set("key", "value", 3600);
assert!(ok);
}
#[test]
fn test_redis_del_noop_on_native() {
let ok = redis::del("key");
assert!(ok);
}
#[test]
fn test_http_fetch_response_helpers() {
let resp = http::FetchResponse {
status: 200,
body: r#"{"key": "value"}"#.to_string(),
body_encoding: "utf8".to_string(),
headers: HashMap::new(),
};
assert!(resp.is_success());
assert!(!resp.is_base64());
let json = resp.json().unwrap();
assert_eq!(json["key"], "value");
let err_resp = http::FetchResponse {
status: 404,
body: "not found".to_string(),
body_encoding: "utf8".to_string(),
headers: HashMap::new(),
};
assert!(!err_resp.is_success());
let binary_resp = http::FetchResponse {
status: 200,
body: "aW1hZ2VkYXRh".to_string(),
body_encoding: "base64".to_string(),
headers: HashMap::new(),
};
assert!(binary_resp.is_base64());
}
#[test]
fn context_tenant_returns_empty_outside_wasm() {
assert_eq!(context::tenant(), "");
}
#[test]
fn redis_get_with_status_returns_none_outside_wasm() {
assert_eq!(redis::get_with_status("any"), Ok(None));
}
#[test]
fn redis_mget_returns_empty_for_empty_input() {
assert_eq!(redis::mget(&[]), Ok(Vec::new()));
}
#[test]
fn redis_mget_returns_misses_outside_wasm() {
assert_eq!(redis::mget(&["a", "b", "c"]), Ok(vec![None, None, None]));
}
}