mumuprocess/
lib.rs

1// src/lib.rs
2// src/lib.rs (final, complete, with process:env and process:envs)
3
4use core_mumu::{
5    parser::interpreter::Interpreter,
6    parser::types::{Value, FunctionValue},
7    parser::lexer::tokenize,
8    parser::core::driver::parse_tokens,
9};
10use std::ffi::c_void;
11use std::{
12    sync::{
13        Arc,
14        Mutex,
15        mpsc::{channel, Receiver, Sender},
16        atomic::{AtomicUsize, Ordering},
17    },
18    collections::HashMap,
19    env,
20};
21use lazy_static::lazy_static;
22use indexmap::IndexMap;
23use whoami;
24
25// Global counter for test harnesses or others to track active tasks
26pub static ACTIVE_TASKS: AtomicUsize = AtomicUsize::new(0);
27
28// If you’re on Unix, get the actual UID; otherwise fallback to 0
29#[cfg(unix)]
30fn get_uid() -> i32 {
31    use nix::unistd::getuid;
32    getuid().as_raw() as i32
33}
34#[cfg(not(unix))]
35fn get_uid() -> i32 {
36    0
37}
38
39// Internal types for task/process manager
40#[allow(dead_code)]
41enum ProcessMessage {
42    Ok(usize, Value),
43    Err(usize, String),
44}
45
46struct ProcessTask {
47    callback: Box<FunctionValue>,
48    done: bool,
49}
50
51struct ProcessManager {
52    _next_id: usize,
53    tasks: HashMap<usize, ProcessTask>,
54    _tx: Sender<ProcessMessage>,
55    rx: Receiver<ProcessMessage>,
56    active_count: AtomicUsize,
57}
58
59/// Invoke a user callback with one argument using only the public API.
60/// This avoids depending on non-exported internal helpers.
61fn call_user_callback_public(
62    interp: &mut Interpreter,
63    cb: Box<FunctionValue>,
64    arg: Value,
65) -> Result<(), String> {
66    // Use very unlikely temp names to avoid collisions
67    const CB_VAR: &str = "__mumu_process_cb";
68    const ARG_VAR: &str = "__mumu_process_arg";
69
70    // Bind the callback and its argument into the interpreter
71    interp.set_variable(CB_VAR, Value::Function(cb));
72    interp.set_variable(ARG_VAR, arg);
73
74    // Parse and execute: __mumu_process_cb(__mumu_process_arg);
75    let src = format!("{CB_VAR}({ARG_VAR});");
76    let toks = tokenize(&src, interp.is_verbose()).map_err(|e| e.to_string())?;
77    let stmts = parse_tokens(&toks, interp.is_verbose()).map_err(|e| e.to_string())?;
78    for stmt in &stmts {
79        let _ = interp.exec_statement(stmt)?; // propagate any error
80    }
81    Ok(())
82}
83
84impl ProcessManager {
85    fn new() -> Self {
86        let (tx, rx) = channel();
87        Self {
88            _next_id: 0,
89            tasks: HashMap::new(),
90            _tx: tx,
91            rx,
92            active_count: AtomicUsize::new(0),
93        }
94    }
95
96    fn poll_events(&mut self, interp: &mut Interpreter) {
97        while let Ok(msg) = self.rx.try_recv() {
98            match msg {
99                ProcessMessage::Ok(id, data_val) => {
100                    if let Some(t) = self.tasks.get_mut(&id) {
101                        if !t.done {
102                            t.done = true;
103                            if let Err(e) = call_user_callback_public(interp, t.callback.clone(), data_val) {
104                                eprintln!("[process] callback error: {e}");
105                            }
106                        }
107                    }
108                }
109                ProcessMessage::Err(id, err_str) => {
110                    if let Some(t) = self.tasks.get_mut(&id) {
111                        if !t.done {
112                            t.done = true;
113                            let mut map = IndexMap::new();
114                            map.insert("error".to_string(), Value::SingleString(err_str));
115                            let final_val = Value::KeyedArray(map);
116                            if let Err(e) = call_user_callback_public(interp, t.callback.clone(), final_val) {
117                                eprintln!("[process] callback error: {e}");
118                            }
119                        }
120                    }
121                }
122            }
123        }
124
125        let before = self.tasks.len();
126        self.tasks.retain(|_, t| !t.done);
127        let removed = before.saturating_sub(self.tasks.len());
128        if removed > 0 {
129            self.active_count.fetch_sub(removed, Ordering::SeqCst);
130        }
131    }
132
133    fn count_tasks(&self) -> usize {
134        self.active_count.load(Ordering::SeqCst)
135    }
136}
137
138// Not currently called, so mark it unused to avoid warnings
139#[allow(dead_code)]
140fn process_spawn_bridge(
141    _interp: &mut Interpreter,
142    _args: Vec<Value>
143) -> Result<Value, String> {
144    Ok(Value::Bool(true))
145}
146
147/// process:info => gather system data in a callback
148fn process_info_bridge(
149    interp: &mut Interpreter,
150    mut args: Vec<Value>
151) -> Result<Value, String> {
152    if args.len() != 1 {
153        return Err(format!("process:info => expected 1 argument => callback, got {}", args.len()));
154    }
155    let callback_val = args.remove(0);
156    let callback_func = match callback_val {
157        Value::Function(fb) => fb,
158        other => return Err(format!("process:info => first arg must be function, got {:?}", other)),
159    };
160
161    let mut map = IndexMap::new();
162    let pid_u32 = std::process::id();
163    map.insert("pid".to_string(), Value::Int(pid_u32 as i32));
164    map.insert("uid".to_string(), Value::Int(get_uid()));
165    map.insert("username".to_string(), Value::SingleString(whoami::username()));
166    map.insert("binary_name".to_string(), Value::SingleString("mumu".into()));
167    map.insert("event_loop_len".to_string(), Value::Int(42));
168    let info_val = Value::KeyedArray(map);
169
170    // Call the user callback using the public parsing/execution path
171    call_user_callback_public(interp, callback_func, info_val)?;
172
173    Ok(Value::Bool(true))
174}
175
176/// process:check_tasks => poll manager, return how many tasks remain
177fn process_check_tasks_bridge(
178    interp: &mut Interpreter,
179    _args: Vec<Value>
180) -> Result<Value, String> {
181    let mut mgr = PROCESS_MANAGER.lock().unwrap();
182    mgr.poll_events(interp);
183    let count = mgr.count_tasks();
184    Ok(Value::Int(count as i32))
185}
186
187// === NEW FUNCTIONS ===
188
189/// process:envs => returns a KeyedArray of all environment variables
190fn process_envs_bridge(
191    _interp: &mut Interpreter,
192    args: Vec<Value>
193) -> Result<Value, String> {
194    if !args.is_empty() {
195        return Err("process:envs does not take any arguments".to_string());
196    }
197    let mut map = IndexMap::new();
198    for (key, value) in env::vars() {
199        map.insert(key, Value::SingleString(value));
200    }
201    Ok(Value::KeyedArray(map))
202}
203
204/// process:env => returns the value of a specific environment variable as string, or "" if not set
205fn process_env_bridge(
206    _interp: &mut Interpreter,
207    mut args: Vec<Value>
208) -> Result<Value, String> {
209    if args.len() != 1 {
210        return Err("process:env expects exactly one argument (name of variable)".to_string());
211    }
212    let key = match args.remove(0) {
213        Value::SingleString(s) => s,
214        Value::StrArray(ref arr) if arr.len() == 1 => arr[0].clone(),
215        other => return Err(format!("process:env expects a string or single string array, got {:?}", other)),
216    };
217    let val = env::var(&key).unwrap_or_else(|_| "".to_string());
218    Ok(Value::SingleString(val))
219}
220
221lazy_static! {
222    static ref PROCESS_MANAGER: Mutex<ProcessManager> = Mutex::new(ProcessManager::new());
223}
224
225#[export_name = "Cargo_lock"]
226pub unsafe extern "C" fn cargo_lock(
227    interp_ptr: *mut c_void,
228    _extra_str: *const c_void,
229) -> i32 {
230    if interp_ptr.is_null() {
231        return 1;
232    }
233    let interp_ref = &mut *(interp_ptr as *mut Interpreter);
234
235    // Register process:info bridging:
236    let info_fn = Arc::new(Mutex::new(process_info_bridge));
237    interp_ref.register_dynamic_function("process:info", info_fn);
238    interp_ref.set_variable(
239        "process:info",
240        Value::Function(Box::new(FunctionValue::Named("process:info".to_string())))
241    );
242
243    // Register process:check_tasks bridging:
244    let check_fn = Arc::new(Mutex::new(process_check_tasks_bridge));
245    interp_ref.register_dynamic_function("process:check_tasks", check_fn);
246    interp_ref.set_variable(
247        "process:check_tasks",
248        Value::Function(Box::new(FunctionValue::Named("process:check_tasks".to_string())))
249    );
250
251    // Register process:envs
252    let envs_fn = Arc::new(Mutex::new(process_envs_bridge));
253    interp_ref.register_dynamic_function("process:envs", envs_fn.clone());
254    interp_ref.set_variable(
255        "process:envs",
256        Value::Function(Box::new(FunctionValue::Named("process:envs".to_string())))
257    );
258
259    // Register process:env
260    let env_fn = Arc::new(Mutex::new(process_env_bridge));
261    interp_ref.register_dynamic_function("process:env", env_fn.clone());
262    interp_ref.set_variable(
263        "process:env",
264        Value::Function(Box::new(FunctionValue::Named("process:env".to_string())))
265    );
266
267    // Add poller for background tasks if you have any
268    let poller = Arc::new(Mutex::new(move |interp: &mut Interpreter| {
269        let mut mgr = PROCESS_MANAGER.lock().unwrap();
270        mgr.poll_events(interp);
271        mgr.count_tasks()
272    }));
273    interp_ref.add_poller(poller);
274
275    0
276}
277
278// Value-to-string helper (updated to current Value variants)
279pub fn value_to_string(val: &Value) -> String {
280    match val {
281        Value::Int(x) => x.to_string(),
282        Value::IntArray(xs) => format!("{:?}", xs),
283        Value::Int2DArray(rows) => format!("{:?}", rows),
284        Value::Float(f) => f.to_string(),
285        Value::FloatArray(ff) => format!("{:?}", ff),
286        Value::Float2DArray(rows) => format!("{:?}", rows),
287        Value::StrArray(ss) => format!("{:?}", ss),
288        Value::KeyedArray(map) => {
289            let mut s = String::from("{ ");
290            let mut first = true;
291            for (k, v) in map.iter() {
292                if !first {
293                    s.push_str(", ");
294                }
295                first = false;
296                s.push_str(k);
297                s.push_str(": ");
298                s.push_str(&value_to_string(v));
299            }
300            s.push_str(" }");
301            s
302        }
303        Value::Function(_) => "[Function]".to_string(),
304        Value::Bool(b) => b.to_string(),
305        Value::BoolArray(bb) => format!("{:?}", bb),
306        Value::Placeholder => "_".to_string(),
307        Value::Undefined => "undefined".to_string(),
308        Value::SingleString(s) => format!("\"{}\"", s),
309        Value::Long(l) => l.to_string(),
310        Value::Stream(sh) => format!("<Stream id={}, label={}>", sh.stream_id, sh.label),
311        Value::Iterator(_) => "[Iterator]".to_string(),
312        Value::Tensor(_) => "[Tensor]".to_string(),
313        Value::MixedArray(items) => {
314            let inner = items.iter().map(|v| value_to_string(v)).collect::<Vec<_>>().join(", ");
315            format!("[{}]", inner)
316        }
317        Value::Ref(arc_mutex) => value_to_string(&arc_mutex.lock().unwrap()),
318        Value::Regex(rx) => format!("Regex(/{}{}/)", rx.pattern, rx.flags),
319    }
320}