use crate::bridge::{self, AppServiceCommand};
use crate::error::LxAppError;
use crate::lx;
use crate::lxapp::LxApp;
use crate::{error, info};
use rong::{JSContext, JSResult, JSRuntime, JSValue, RongJSError, Source, error::HostError};
use rong_console as console;
use rong_http as http;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, mpsc};
use std::time::Duration;
use tokio::sync::oneshot;
#[path = "app.rs"]
mod app;
use crate::lifecycle::AppServiceEvent;
#[path = "event_bus.rs"]
pub(crate) mod event_bus;
#[path = "page.rs"]
mod page;
use crate::lifecycle::PageServiceEvent;
pub use page::PageSvc;
#[path = "plugin.rs"]
mod plugin;
#[path = "runtime_ctx.rs"]
mod runtime_ctx;
pub(crate) use runtime_ctx::set_app_svc_for_ctx;
use runtime_ctx::{register_app_ctx, remove_app_ctx, with_app_svc, with_page_svc_map};
pub(crate) enum ServiceMessage {
CreateAppSvc {
lxapp: Arc<LxApp>,
},
TerminateAppSvc {
lxapp: Arc<LxApp>,
ack_tx: mpsc::Sender<()>,
},
CreatePage {
lxapp: Arc<LxApp>,
path: String,
page_instance_id: Option<String>,
ack_tx: oneshot::Sender<Result<(), String>>,
},
TerminatePage {
lxapp: Arc<LxApp>,
path: String,
page_instance_id: Option<String>,
},
CallAppSvcEvent {
lxapp: Arc<LxApp>,
event: AppServiceEvent,
args: Option<String>,
},
CallPageSvc {
lxapp: Arc<LxApp>,
path: String,
page_instance_id: Option<String>,
source: PageSvcSource,
},
CallPageSvcEvent {
lxapp: Arc<LxApp>,
path: String,
page_instance_id: Option<String>,
event: PageServiceEvent,
args: Option<String>,
},
DispatchAppBusEvent {
lxapp: Arc<LxApp>,
event: event_bus::AppBusEvent,
},
Eval {
lxapp: Arc<LxApp>,
script: String,
tx: oneshot::Sender<Result<String, LxAppError>>,
},
}
pub enum PageSvcSource {
Bridge {
message: crate::bridge::AppServiceCommand,
},
Native {
name: String,
args: Option<String>, },
}
pub(crate) struct WorkerService {
pub(crate) svc: ServiceMessage,
}
async fn handle_app_service_event(
worker_id: usize,
ctx: &JSContext,
appid: String,
event: AppServiceEvent,
args: Option<String>,
) {
let svc = match with_app_svc(ctx, |svc| Ok(svc.clone())) {
Ok(svc) => svc,
Err(e) => {
error!("[Worker {}] App service not loaded: {}", worker_id, e).with_appid(appid);
return;
}
};
if matches!(
event,
AppServiceEvent::OnLaunch
| AppServiceEvent::OnShow
| AppServiceEvent::OnHide
| AppServiceEvent::OnUserCaptureScreen
) {
if let Err(e) = svc.call_event(ctx, event, args.clone()).await {
error!(
"[Worker {}] App service event '{}' failed, Error: {}",
worker_id, event, e
)
.with_appid(appid);
}
}
}
fn js_value_to_json_string(value: JSValue) -> Result<String, LxAppError> {
if value.is_undefined() || value.is_null() {
return Ok("null".to_string());
}
if value.is_boolean() {
let value: bool = value.into_value().try_into().map_err(LxAppError::from)?;
return Ok(if value { "true" } else { "false" }.to_string());
}
if value.is_number() {
let value: f64 = value.into_value().try_into().map_err(LxAppError::from)?;
let number = serde_json::Number::from_f64(value)
.ok_or_else(|| LxAppError::Runtime("eval returned invalid number".to_string()))?;
return Ok(number.to_string());
}
if value.is_string() {
let value: String = value.into_value().try_into().map_err(LxAppError::from)?;
return serde_json::to_string(&value).map_err(LxAppError::from);
}
if let Some(object) = value.into_object() {
return object.to_json_string().map_err(LxAppError::from);
}
Ok("null".to_string())
}
fn eval_error_from_rong(ctx: &JSContext, error: RongJSError) -> LxAppError {
if let Some(thrown) = error.thrown_value(ctx) {
if thrown.is_string() {
let value: Result<String, RongJSError> = thrown.into_value().try_into();
if let Ok(value) = value {
return LxAppError::RongJS(value);
}
} else if let Some(object) = thrown.into_object() {
let name = object
.get::<_, String>("name")
.unwrap_or_else(|_| "Error".to_string());
if let Ok(message) = object.get::<_, String>("message") {
return LxAppError::RongJS(format!("{name}: {message}"));
}
}
}
LxAppError::from(error)
}
async fn eval_logic_script(ctx: &JSContext, script: &str) -> Result<String, LxAppError> {
let expression_json = serde_json::to_string(script).map_err(LxAppError::from)?;
let expression = format!(
r#"(async () => {{
return await eval({expression_json});
}})()"#
);
match ctx
.eval_async::<JSValue>(Source::from_bytes(expression))
.await
{
Ok(value) => return js_value_to_json_string(value),
Err(expression_error) if script_may_be_function_body(script, &expression_error) => {
let body = format!(
r#"(async () => {{
{script}
}})()"#
);
let value = ctx
.eval_async::<JSValue>(Source::from_bytes(body))
.await
.map_err(|body_error| eval_error_from_rong(ctx, body_error))?;
js_value_to_json_string(value)
}
Err(expression_error) => Err(eval_error_from_rong(ctx, expression_error)),
}
}
fn script_may_be_function_body(script: &str, expression_error: &RongJSError) -> bool {
if !expression_error
.to_string()
.to_ascii_lowercase()
.contains("syntax")
{
return false;
}
let trimmed = script.trim_start();
trimmed.starts_with("return")
|| trimmed.starts_with("const ")
|| trimmed.starts_with("let ")
|| trimmed.starts_with("var ")
|| trimmed.starts_with("if ")
|| trimmed.starts_with("for ")
|| trimmed.starts_with("while ")
|| trimmed.starts_with("try ")
|| trimmed.contains(';')
}
async fn handle_bridge_source(
page_svc: &PageSvc,
message: AppServiceCommand,
) -> Result<(), LxAppError> {
match message {
AppServiceCommand::Ready => {
page_svc.handle_bridge_ready().await;
Ok(())
}
AppServiceCommand::StateSnapshot { id, scope } => {
let bridge = page_svc.bridge();
match page_svc.get_state_snapshot(scope.as_deref()).await {
Ok(snapshot) => bridge.send_res_ok(page_svc, id, snapshot)?,
Err(err) => bridge.send_res_err(
page_svc,
id,
bridge::BRIDGE_INTERNAL_ERROR,
Some(err.to_string()),
None,
)?,
}
Ok(())
}
AppServiceCommand::Req {
id,
method,
params_json,
cancel_rx,
} => {
let bridge = page_svc.bridge();
match page_svc
.handle_req(&id, &method, params_json.as_deref(), cancel_rx)
.await
{
Ok(json) => bridge.send_res_ok(page_svc, id, json)?,
Err(err) => bridge.send_res_err(page_svc, id, &err.code, err.message, err.data)?,
}
Ok(())
}
AppServiceCommand::Notify {
method,
params_json,
} => {
page_svc
.handle_notify(&method, params_json.as_deref())
.await;
Ok(())
}
AppServiceCommand::ChOpen {
id,
topic,
params_json,
} => {
let bridge = page_svc.bridge();
match page_svc
.handle_ch_open(&id, &topic, params_json.as_deref())
.await
{
Ok(()) => bridge.send_ch_ack_ok(page_svc, id)?,
Err(err) => {
bridge.send_ch_ack_err(page_svc, id, &err.code, err.message, err.data)?
}
}
Ok(())
}
AppServiceCommand::ChData { id, payload_json } => {
if let Err(err) = page_svc.handle_ch_data(&id, &payload_json).await {
error!("channel '{}' data handler failed: {}", id, err.code)
.with_appid(page_svc.page.appid())
.with_path(page_svc.page.path());
}
Ok(())
}
AppServiceCommand::ChClose { id, code, reason } => {
page_svc
.handle_ch_close(&id, code.as_deref(), reason.as_deref())
.await;
Ok(())
}
AppServiceCommand::StateAck { scope, rev } => {
page_svc.handle_state_ack(scope, rev).await;
Ok(())
}
}
}
async fn handle_native_source(
page_svc: &PageSvc,
appid: String,
name: String,
args: Option<String>,
) {
let ctx = page_svc.get_ctx();
let page_svc_clone = page_svc.clone();
let name_clone = name.clone();
let task = async move {
if let Err(e) = page_svc_clone
.call_or_event_from_native(&ctx, &name, args.as_deref())
.await
{
crate::error!("PageInstance service call '{}' failed: {}", name_clone, e)
.with_appid(appid)
.with_path(page_svc_clone.page.path());
}
};
rong::spawn_local(task);
}
pub(crate) async fn lxapp_service_handler(
worker_id: usize,
runtime: JSRuntime,
message: ServiceMessage,
current_ctx: &mut Option<JSContext>,
) {
match message {
ServiceMessage::CreateAppSvc { lxapp } => {
let ctx = runtime.context();
register_app_ctx(&runtime, &ctx, &lxapp);
if let Err(e) = app::init(&ctx) {
error!(
"[Worker {}] Failed to initialize App runtime: {}",
worker_id, e
)
.with_appid(lxapp.appid.clone());
return;
}
if let Err(e) = page::init(&ctx) {
error!(
"[Worker {}] Failed to initialize PageInstance runtime: {}",
worker_id, e
)
.with_appid(lxapp.appid.clone());
return;
}
if let Err(e) = plugin::init(&ctx) {
error!(
"[Worker {}] Failed to initialize Plugin runtime: {}",
worker_id, e
)
.with_appid(lxapp.appid.clone());
return;
}
event_bus::init(&ctx);
let app_ctx = LxAppCtx::new(lxapp.clone());
console::set_trace_context(
&ctx,
console::ConsoleTraceContext {
namespace: Some(lxapp.appid.clone()),
scope: Some("appservice".to_string()),
},
);
http::set_network_access_guard(Box::new(app_ctx));
let _ = rong_modules::init(&ctx);
let _ = lx::init(&ctx);
crate::lx::extension::with_registered_extensions(|user_extensions| {
info!(
"[Worker {}] Initializing {} user-registered extensions",
worker_id,
user_extensions.len()
)
.with_appid(lxapp.appid.clone());
for (index, user_extension) in user_extensions.iter().enumerate() {
if let Err(e) = user_extension.init(&ctx) {
error!(
"[Worker {}] Failed to initialize user extension #{}: {}",
worker_id, index, e
)
.with_appid(lxapp.appid.clone());
}
}
});
info!("[Worker {}] Created JS context", worker_id).with_appid(lxapp.appid.clone());
match lxapp.logic_entry_source(&ctx).await {
Ok(Some(js)) => match ctx.eval::<()>(js) {
Ok(_) => {
info!("[Worker {}] Successfully loaded logic JS", worker_id)
.with_appid(lxapp.appid.clone());
}
Err(e) => {
info!("[Worker {}] eval logic JS failed: {}", worker_id, e)
.with_appid(lxapp.appid.clone());
}
},
Ok(None) => {
info!(
"[Worker {}] Logic disabled; skipping JS bootstrap",
worker_id
)
.with_appid(lxapp.appid.clone());
}
Err(e) => {
error!("[Worker {}] Failed to load logic source: {}", worker_id, e)
.with_appid(lxapp.appid.clone());
}
}
*current_ctx = Some(ctx.clone());
}
ServiceMessage::TerminateAppSvc { lxapp, ack_tx } => {
if current_ctx.is_some() {
if let Some(ctx) = current_ctx.as_ref() {
console::clear_trace_context(ctx);
}
*current_ctx = None;
info!("[Worker {}] Removed LxApp context ", worker_id)
.with_appid(lxapp.appid.clone());
}
http::set_network_access_guard(Box::new(DenyAllNetworkAccessGuard));
remove_app_ctx(&runtime, &lxapp.appid);
let _ = ack_tx.send(());
}
ServiceMessage::CreatePage {
lxapp,
path,
page_instance_id,
ack_tx,
} => {
let result = if let Some(ctx) = current_ctx.as_ref() {
match PageSvc::create_in_ctx(ctx, &path, page_instance_id.as_deref()).await {
Ok(()) => Ok(()),
Err(e) => {
let msg = e.to_string();
error!("[Worker {}] create_in_ctx failed: {}", worker_id, e)
.with_appid(lxapp.appid.clone())
.with_path(&path);
Err(msg)
}
}
} else {
let msg = "JS context not available".to_string();
error!("[Worker {}] create_in_ctx: {}", worker_id, msg)
.with_appid(lxapp.appid.clone())
.with_path(&path);
Err(msg)
};
let _ = ack_tx.send(result);
}
ServiceMessage::TerminatePage {
lxapp,
path,
page_instance_id,
} => {
if let Some(ctx) = current_ctx.as_ref() {
let same_app = LxApp::from_ctx(ctx)
.map(|ctx_app| ctx_app.session.id == lxapp.session.id)
.unwrap_or(false);
if !same_app {
info!(
"[Worker {}] Ignored TerminatePage for different LxApp instance",
worker_id
)
.with_appid(lxapp.appid.clone())
.with_path(path.clone());
return;
}
let page_svc = with_page_svc_map(ctx, |page_svc_map| {
let mut page_svc_map = page_svc_map.borrow_mut();
let page_svc = match page_instance_id.as_deref() {
Some(id) => page_svc_map.remove(id),
None => page_svc_map.get(&path).cloned(),
};
if let Some(page_svc) = page_svc.as_ref() {
let instance_id = page_svc.get_page().instance_id_string();
page_svc_map.remove(instance_id.as_str());
if page_svc_map.get(&path).is_some_and(|candidate| {
candidate.get_page().instance_id_string() == instance_id
}) {
page_svc_map.remove(&path);
}
}
Ok(page_svc)
})
.unwrap_or(None);
if page_svc.is_some() {
event_bus::clear_page(ctx, &path);
info!("[Worker {}] Removed page", worker_id)
.with_appid(lxapp.appid.clone())
.with_path(path);
}
}
}
ServiceMessage::CallAppSvcEvent { lxapp, event, args } => {
if let Some(ctx) = current_ctx.as_ref() {
let same_app = LxApp::from_ctx(ctx)
.map(|ctx_app| ctx_app.session.id == lxapp.session.id)
.unwrap_or(false);
if same_app {
let ctx = ctx.clone();
let appid = lxapp.appid.clone();
rong::spawn_local(async move {
handle_app_service_event(worker_id, &ctx, appid, event, args).await;
});
}
}
}
ServiceMessage::CallPageSvc {
lxapp,
path,
page_instance_id,
source,
} => {
if let Some(ctx) = current_ctx.as_ref() {
match source {
PageSvcSource::Bridge { message } => {
let page_svc = with_page_svc_map(ctx, |page_svc_map| {
let page_svc_map = page_svc_map.borrow();
Ok(page_instance_id
.as_deref()
.and_then(|id| page_svc_map.get(id).cloned())
.or_else(|| page_svc_map.get(&path).cloned()))
})
.unwrap_or(None);
if let Some(page_svc) = page_svc {
if let Err(e) = handle_bridge_source(&page_svc, message).await {
error!("[Worker {}] Handle bridge message error: {}", worker_id, e)
.with_appid(lxapp.appid.clone())
.with_path(path.clone());
}
} else {
info!(
"[Worker {}] Dropping bridge message: page service not loaded",
worker_id
)
.with_appid(lxapp.appid.clone())
.with_path(path);
}
}
PageSvcSource::Native { name, args } => {
let page_svc = with_page_svc_map(ctx, |page_svc_map| {
let page_svc_map = page_svc_map.borrow();
Ok(page_instance_id
.as_deref()
.and_then(|id| page_svc_map.get(id).cloned())
.or_else(|| page_svc_map.get(&path).cloned()))
})
.unwrap_or(None);
if let Some(page_svc) = page_svc {
handle_native_source(&page_svc, lxapp.appid.clone(), name, args).await;
} else {
info!(
"[Worker {}] Dropping native call: page service not loaded",
worker_id
)
.with_appid(lxapp.appid.clone())
.with_path(path);
}
}
}
}
}
ServiceMessage::CallPageSvcEvent {
lxapp,
path,
page_instance_id,
event,
args,
} => {
if let Some(ctx) = current_ctx.as_ref() {
let page_svc = with_page_svc_map(ctx, |page_svc_map| {
let page_svc_map = page_svc_map.borrow();
Ok(page_instance_id
.as_deref()
.and_then(|id| page_svc_map.get(id).cloned())
.or_else(|| page_svc_map.get(&path).cloned()))
})
.unwrap_or(None);
if let Some(page_svc) = page_svc {
if let Err(e) = page_svc.call_page_event(ctx, event, args.as_deref()).await {
error!(
"[Worker {}] PageInstance event '{}' failed: {}",
worker_id, event, e
)
.with_appid(lxapp.appid.clone())
.with_path(path);
}
} else {
info!(
"[Worker {}] Dropping page event: page service not loaded",
worker_id
)
.with_appid(lxapp.appid.clone())
.with_path(path);
}
}
}
ServiceMessage::DispatchAppBusEvent { lxapp, event } => {
if let Some(ctx) = current_ctx.as_ref() {
let same_app = LxApp::from_ctx(ctx)
.map(|ctx_app| ctx_app.session.id == lxapp.session.id)
.unwrap_or(false);
if same_app {
let ctx = ctx.clone();
let appid = lxapp.appid.clone();
rong::spawn_local(async move {
if let Err(e) = event_bus::dispatch_app_bus_event(&ctx, &event).await {
error!(
"[Worker {}] Dispatch app bus event failed: {}",
worker_id, e
)
.with_appid(appid);
}
});
}
}
}
ServiceMessage::Eval { lxapp, script, tx } => {
let result = if let Some(ctx) = current_ctx.as_ref() {
let same_app = LxApp::from_ctx(ctx)
.map(|ctx_app| ctx_app.session.id == lxapp.session.id)
.unwrap_or(false);
if same_app {
eval_logic_script(ctx, &script).await
} else {
Err(LxAppError::Runtime(format!(
"logic runtime is bound to a different lxapp than {}",
lxapp.appid
)))
}
} else {
Err(LxAppError::Runtime(format!(
"logic runtime is not ready for {}",
lxapp.appid
)))
};
let _ = tx.send(result);
}
}
}
pub(crate) fn create_app_svc(
lxapp: Arc<crate::lxapp::LxApp>,
sender: &mpsc::Sender<ServiceMessage>,
instance_assignments: &Arc<Mutex<HashMap<usize, usize>>>,
free_workers: &Arc<Mutex<VecDeque<usize>>>,
) -> Result<(), LxAppError> {
let appid = lxapp.appid.clone();
let key = lxapp.as_ref() as *const _ as usize;
{
let assignments = instance_assignments.lock().unwrap();
if assignments.contains_key(&key) {
info!("Reusing existing worker for app {}", appid);
return Ok(());
}
}
let worker_id = {
let mut free_workers_guard = free_workers.lock().unwrap();
if free_workers_guard.is_empty() {
return Err(LxAppError::ResourceExhausted(
"No available workers for new mini-app".to_string(),
));
}
free_workers_guard.pop_front().unwrap()
};
instance_assignments.lock().unwrap().insert(key, worker_id);
if let Err(e) = sender.send(ServiceMessage::CreateAppSvc { lxapp }) {
instance_assignments.lock().unwrap().remove(&key);
free_workers.lock().unwrap().push_front(worker_id);
return Err(e.into());
}
info!("Assigned dedicated worker {} to app {}", worker_id, appid);
Ok(())
}
pub(crate) fn terminate_app_svc(
lxapp_arc: Arc<LxApp>,
sender: &mpsc::Sender<ServiceMessage>,
instance_assignments: &Arc<Mutex<HashMap<usize, usize>>>,
free_workers: &Arc<Mutex<VecDeque<usize>>>,
) -> Result<(), LxAppError> {
let appid = lxapp_arc.appid.clone();
let key = lxapp_arc.as_ref() as *const _ as usize;
let worker_id_opt = instance_assignments.lock().unwrap().get(&key).copied();
if worker_id_opt.is_none() {
info!(
"No active worker mapping for app {}; skipping terminate",
appid
);
return Ok(());
}
let (tx, rx) = mpsc::channel();
sender.send(ServiceMessage::TerminateAppSvc {
lxapp: lxapp_arc,
ack_tx: tx,
})?;
let acked = rx.recv_timeout(Duration::from_secs(3)).is_ok();
if acked {
info!("Terminate ACK received").with_appid(appid.clone());
} else {
error!("Terminate ACK timeout; forcing release").with_appid(appid.clone());
}
let worker_id_opt = instance_assignments.lock().unwrap().remove(&key);
if let Some(worker_id) = worker_id_opt {
free_workers.lock().unwrap().push_back(worker_id);
info!("Released dedicated worker {} from app {}", worker_id, appid);
}
Ok(())
}
#[derive(Clone)]
struct LxAppCtx {
lxapp: Arc<LxApp>,
}
#[derive(Debug)]
struct DenyAllNetworkAccessGuard;
impl http::NetworkAccessGuard for DenyAllNetworkAccessGuard {
fn check_access(&self, _domain: &str) -> JSResult<()> {
Err(network_access_denied_error("network access is denied"))
}
}
impl LxAppCtx {
pub fn new(lxapp: Arc<LxApp>) -> Self {
Self { lxapp }
}
}
impl std::fmt::Debug for LxAppCtx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LxAppCtx")
.field("appid", &self.lxapp.appid)
.finish()
}
}
impl http::NetworkAccessGuard for LxAppCtx {
fn check_access(&self, domain: &str) -> JSResult<()> {
if self.lxapp.is_domain_allowed(domain) {
Ok(())
} else {
Err(network_access_denied_error(format!(
"domain '{domain}' is not allowed by lxapp security policy"
)))
}
}
}
fn network_access_denied_error(detail: impl AsRef<str>) -> RongJSError {
HostError::new(rong::error::E_PERMISSION_DENIED, "Permission denied")
.with_data(rong::err_data!({ bizCode: (3000), detail: (detail.as_ref()) }))
.into()
}