1use core_mumu::{
5 parser::interpreter::Interpreter,
6 parser::types::{Value, FunctionValue},
7 parser::lexer::tokenize,
8 parser::core::driver::parse_tokens,
9};
10use std::ffi::c_void;
11use std::{
12 sync::{
13 Arc,
14 Mutex,
15 mpsc::{channel, Receiver, Sender},
16 atomic::{AtomicUsize, Ordering},
17 },
18 collections::HashMap,
19 env,
20};
21use lazy_static::lazy_static;
22use indexmap::IndexMap;
23use whoami;
24
25pub static ACTIVE_TASKS: AtomicUsize = AtomicUsize::new(0);
27
28#[cfg(unix)]
30fn get_uid() -> i32 {
31 use nix::unistd::getuid;
32 getuid().as_raw() as i32
33}
34#[cfg(not(unix))]
35fn get_uid() -> i32 {
36 0
37}
38
39#[allow(dead_code)]
41enum ProcessMessage {
42 Ok(usize, Value),
43 Err(usize, String),
44}
45
46struct ProcessTask {
47 callback: Box<FunctionValue>,
48 done: bool,
49}
50
51struct ProcessManager {
52 _next_id: usize,
53 tasks: HashMap<usize, ProcessTask>,
54 _tx: Sender<ProcessMessage>,
55 rx: Receiver<ProcessMessage>,
56 active_count: AtomicUsize,
57}
58
59fn call_user_callback_public(
62 interp: &mut Interpreter,
63 cb: Box<FunctionValue>,
64 arg: Value,
65) -> Result<(), String> {
66 const CB_VAR: &str = "__mumu_process_cb";
68 const ARG_VAR: &str = "__mumu_process_arg";
69
70 interp.set_variable(CB_VAR, Value::Function(cb));
72 interp.set_variable(ARG_VAR, arg);
73
74 let src = format!("{CB_VAR}({ARG_VAR});");
76 let toks = tokenize(&src, interp.is_verbose()).map_err(|e| e.to_string())?;
77 let stmts = parse_tokens(&toks, interp.is_verbose()).map_err(|e| e.to_string())?;
78 for stmt in &stmts {
79 let _ = interp.exec_statement(stmt)?; }
81 Ok(())
82}
83
84impl ProcessManager {
85 fn new() -> Self {
86 let (tx, rx) = channel();
87 Self {
88 _next_id: 0,
89 tasks: HashMap::new(),
90 _tx: tx,
91 rx,
92 active_count: AtomicUsize::new(0),
93 }
94 }
95
96 fn poll_events(&mut self, interp: &mut Interpreter) {
97 while let Ok(msg) = self.rx.try_recv() {
98 match msg {
99 ProcessMessage::Ok(id, data_val) => {
100 if let Some(t) = self.tasks.get_mut(&id) {
101 if !t.done {
102 t.done = true;
103 if let Err(e) = call_user_callback_public(interp, t.callback.clone(), data_val) {
104 eprintln!("[process] callback error: {e}");
105 }
106 }
107 }
108 }
109 ProcessMessage::Err(id, err_str) => {
110 if let Some(t) = self.tasks.get_mut(&id) {
111 if !t.done {
112 t.done = true;
113 let mut map = IndexMap::new();
114 map.insert("error".to_string(), Value::SingleString(err_str));
115 let final_val = Value::KeyedArray(map);
116 if let Err(e) = call_user_callback_public(interp, t.callback.clone(), final_val) {
117 eprintln!("[process] callback error: {e}");
118 }
119 }
120 }
121 }
122 }
123 }
124
125 let before = self.tasks.len();
126 self.tasks.retain(|_, t| !t.done);
127 let removed = before.saturating_sub(self.tasks.len());
128 if removed > 0 {
129 self.active_count.fetch_sub(removed, Ordering::SeqCst);
130 }
131 }
132
133 fn count_tasks(&self) -> usize {
134 self.active_count.load(Ordering::SeqCst)
135 }
136}
137
138#[allow(dead_code)]
140fn process_spawn_bridge(
141 _interp: &mut Interpreter,
142 _args: Vec<Value>
143) -> Result<Value, String> {
144 Ok(Value::Bool(true))
145}
146
147fn process_info_bridge(
149 interp: &mut Interpreter,
150 mut args: Vec<Value>
151) -> Result<Value, String> {
152 if args.len() != 1 {
153 return Err(format!("process:info => expected 1 argument => callback, got {}", args.len()));
154 }
155 let callback_val = args.remove(0);
156 let callback_func = match callback_val {
157 Value::Function(fb) => fb,
158 other => return Err(format!("process:info => first arg must be function, got {:?}", other)),
159 };
160
161 let mut map = IndexMap::new();
162 let pid_u32 = std::process::id();
163 map.insert("pid".to_string(), Value::Int(pid_u32 as i32));
164 map.insert("uid".to_string(), Value::Int(get_uid()));
165 map.insert("username".to_string(), Value::SingleString(whoami::username()));
166 map.insert("binary_name".to_string(), Value::SingleString("mumu".into()));
167 map.insert("event_loop_len".to_string(), Value::Int(42));
168 let info_val = Value::KeyedArray(map);
169
170 call_user_callback_public(interp, callback_func, info_val)?;
172
173 Ok(Value::Bool(true))
174}
175
176fn process_check_tasks_bridge(
178 interp: &mut Interpreter,
179 _args: Vec<Value>
180) -> Result<Value, String> {
181 let mut mgr = PROCESS_MANAGER.lock().unwrap();
182 mgr.poll_events(interp);
183 let count = mgr.count_tasks();
184 Ok(Value::Int(count as i32))
185}
186
187fn process_envs_bridge(
191 _interp: &mut Interpreter,
192 args: Vec<Value>
193) -> Result<Value, String> {
194 if !args.is_empty() {
195 return Err("process:envs does not take any arguments".to_string());
196 }
197 let mut map = IndexMap::new();
198 for (key, value) in env::vars() {
199 map.insert(key, Value::SingleString(value));
200 }
201 Ok(Value::KeyedArray(map))
202}
203
204fn process_env_bridge(
206 _interp: &mut Interpreter,
207 mut args: Vec<Value>
208) -> Result<Value, String> {
209 if args.len() != 1 {
210 return Err("process:env expects exactly one argument (name of variable)".to_string());
211 }
212 let key = match args.remove(0) {
213 Value::SingleString(s) => s,
214 Value::StrArray(ref arr) if arr.len() == 1 => arr[0].clone(),
215 other => return Err(format!("process:env expects a string or single string array, got {:?}", other)),
216 };
217 let val = env::var(&key).unwrap_or_else(|_| "".to_string());
218 Ok(Value::SingleString(val))
219}
220
221lazy_static! {
222 static ref PROCESS_MANAGER: Mutex<ProcessManager> = Mutex::new(ProcessManager::new());
223}
224
225#[export_name = "Cargo_lock"]
226pub unsafe extern "C" fn cargo_lock(
227 interp_ptr: *mut c_void,
228 _extra_str: *const c_void,
229) -> i32 {
230 if interp_ptr.is_null() {
231 return 1;
232 }
233 let interp_ref = &mut *(interp_ptr as *mut Interpreter);
234
235 let info_fn = Arc::new(Mutex::new(process_info_bridge));
237 interp_ref.register_dynamic_function("process:info", info_fn);
238 interp_ref.set_variable(
239 "process:info",
240 Value::Function(Box::new(FunctionValue::Named("process:info".to_string())))
241 );
242
243 let check_fn = Arc::new(Mutex::new(process_check_tasks_bridge));
245 interp_ref.register_dynamic_function("process:check_tasks", check_fn);
246 interp_ref.set_variable(
247 "process:check_tasks",
248 Value::Function(Box::new(FunctionValue::Named("process:check_tasks".to_string())))
249 );
250
251 let envs_fn = Arc::new(Mutex::new(process_envs_bridge));
253 interp_ref.register_dynamic_function("process:envs", envs_fn.clone());
254 interp_ref.set_variable(
255 "process:envs",
256 Value::Function(Box::new(FunctionValue::Named("process:envs".to_string())))
257 );
258
259 let env_fn = Arc::new(Mutex::new(process_env_bridge));
261 interp_ref.register_dynamic_function("process:env", env_fn.clone());
262 interp_ref.set_variable(
263 "process:env",
264 Value::Function(Box::new(FunctionValue::Named("process:env".to_string())))
265 );
266
267 let poller = Arc::new(Mutex::new(move |interp: &mut Interpreter| {
269 let mut mgr = PROCESS_MANAGER.lock().unwrap();
270 mgr.poll_events(interp);
271 mgr.count_tasks()
272 }));
273 interp_ref.add_poller(poller);
274
275 0
276}
277
278pub fn value_to_string(val: &Value) -> String {
280 match val {
281 Value::Int(x) => x.to_string(),
282 Value::IntArray(xs) => format!("{:?}", xs),
283 Value::Int2DArray(rows) => format!("{:?}", rows),
284 Value::Float(f) => f.to_string(),
285 Value::FloatArray(ff) => format!("{:?}", ff),
286 Value::Float2DArray(rows) => format!("{:?}", rows),
287 Value::StrArray(ss) => format!("{:?}", ss),
288 Value::KeyedArray(map) => {
289 let mut s = String::from("{ ");
290 let mut first = true;
291 for (k, v) in map.iter() {
292 if !first {
293 s.push_str(", ");
294 }
295 first = false;
296 s.push_str(k);
297 s.push_str(": ");
298 s.push_str(&value_to_string(v));
299 }
300 s.push_str(" }");
301 s
302 }
303 Value::Function(_) => "[Function]".to_string(),
304 Value::Bool(b) => b.to_string(),
305 Value::BoolArray(bb) => format!("{:?}", bb),
306 Value::Placeholder => "_".to_string(),
307 Value::Undefined => "undefined".to_string(),
308 Value::SingleString(s) => format!("\"{}\"", s),
309 Value::Long(l) => l.to_string(),
310 Value::Stream(sh) => format!("<Stream id={}, label={}>", sh.stream_id, sh.label),
311 Value::Iterator(_) => "[Iterator]".to_string(),
312 Value::Tensor(_) => "[Tensor]".to_string(),
313 Value::MixedArray(items) => {
314 let inner = items.iter().map(|v| value_to_string(v)).collect::<Vec<_>>().join(", ");
315 format!("[{}]", inner)
316 }
317 Value::Ref(arc_mutex) => value_to_string(&arc_mutex.lock().unwrap()),
318 Value::Regex(rx) => format!("Regex(/{}{}/)", rx.pattern, rx.flags),
319 }
320}