wlambda/
threads.rs

1// Copyright (c) 2020-2022 Weird Constructor <weirdconstructor@gmail.com>
2// This is a part of WLambda. See README.md and COPYING for details.
3
4/*!
5
6This module provides threading functionality for WLambda.
7It does not depend on anything else than the Rust standard library.
8
9If you want to implement or specialize thread creation please
10refer to the documentation of the `ThreadCreator` trait.
11
12*/
13
14use crate::vval::*;
15use crate::compiler::*;
16use crate::nvec::NVec;
17use crate::str_int::*;
18
19use std::rc::Rc;
20use std::cell::RefCell;
21use std::sync::{Arc, Mutex, RwLock, Condvar};
22use std::sync::mpsc::{Receiver, Sender, TryRecvError, RecvTimeoutError};
23use std::fmt::Formatter;
24
25use fnv::FnvHashMap;
26
27pub trait ThreadSafeUsr: Send + Sync {
28    fn to_vval(&self) -> VVal;
29}
30
31impl std::fmt::Debug for dyn ThreadSafeUsr {
32    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
33        write!(f, "<<ThreadSafeUsr>>")
34    }
35}
36
37/// AVal is a copy-by-value structure for storing the most
38/// important data of VVals inside an atomic container (AtomicAVal).
39///
40/// You can create an AVal from a VVal like this:
41/// ```
42/// use wlambda::*;
43///
44/// let av = {
45///     let v = VVal::vec();
46///     v.push(VVal::Int(1));
47///     v.push(VVal::Int(2));
48///     v.push(VVal::Int(3));
49///
50///     AVal::from_vval(&v)
51/// };
52///
53/// /// You get back the VVal like this:
54///
55/// assert_eq!(av.to_vval().s(), "$[1,2,3]");
56/// ```
57///
58/// And get back the VVal like this:
59#[derive(Debug)]
60pub enum AVal {
61    None,
62    Err(Box<AVal>, String),
63    Bol(bool),
64    Sym(String),
65    Str(String),
66    Byt(Vec<u8>),
67    Int(i64),
68    Flt(f64),
69    Lst(Vec<AVal>),
70    Opt(Option<Box<AVal>>),
71    FVec(NVec<f64>),
72    IVec(NVec<i64>),
73    Pair(Box<(AVal, AVal)>),
74    Map(FnvHashMap<String, AVal>),
75    Chan(AValChannel),
76    Slot(AtomicAValSlot),
77    Atom(AtomicAVal),
78    Usr(Box<dyn ThreadSafeUsr>),
79}
80
81impl AVal {
82    /// Takes a path of indices and the start index of that path,
83    /// and sets the addressed slot to the given AVal.
84    /// This is used by `std:sync:atom:write_at`.
85    #[allow(dead_code)]
86    pub fn set_at_path(&mut self, path_idx: usize, pth: &VVal, av: AVal) {
87        match pth.at(path_idx).unwrap_or(VVal::None) {
88            VVal::Int(i) => {
89                if let AVal::Lst(ref mut v) = self {
90                    if (i as usize) < v.len() {
91                        v[i as usize] = av;
92                    } else {
93                        v.insert(i as usize, av);
94                    }
95                } else if let AVal::Pair(ref mut v) = self {
96                    let i = (i % 2).abs();
97                    if i == 0 { v.0 = av; }
98                    else      { v.1 = av; }
99                }
100            },
101            v => {
102                let key = v.s_raw();
103                if let AVal::Map(ref mut m) = self {
104                    m.insert(key, av);
105                }
106            },
107        }
108    }
109
110    /// Converts the AVal back to a VVal.
111    ///
112    /// ```
113    /// use wlambda::*;
114    /// assert_eq!(AVal::Sym(String::from("x")).to_vval().s(), ":x");
115    /// ```
116    pub fn to_vval(&self) -> VVal {
117        match self {
118            AVal::None => VVal::None,
119            AVal::Err(av, pos) => {
120                let v = VVal::vec();
121                v.push(av.to_vval());
122                v.push(VVal::new_str(pos));
123                VVal::Err(Rc::new(RefCell::new((v, SynPos::empty()))))
124            },
125            AVal::Bol(b)       => VVal::Bol(*b),
126            AVal::Int(i)       => VVal::Int(*i),
127            AVal::Flt(f)       => VVal::Flt(*f),
128            AVal::Sym(s)       => VVal::new_sym(s),
129            AVal::Str(s)       => VVal::new_str(s),
130            AVal::Byt(b)       => VVal::new_byt(b.clone()),
131            AVal::Atom(a)      => VVal::Usr(Box::new(a.clone())),
132            AVal::Chan(a)      => VVal::Usr(Box::new(a.clone())),
133            AVal::Slot(a)      => VVal::Usr(Box::new(a.clone())),
134            AVal::Opt(None)    => VVal::opt_none(),
135            AVal::Opt(Some(a)) => VVal::opt(a.to_vval()),
136            AVal::Pair(p)      => VVal::pair(p.0.to_vval(), p.1.to_vval()),
137            AVal::FVec(v)      => VVal::FVec(Box::new(*v)),
138            AVal::IVec(v)      => VVal::IVec(Box::new(*v)),
139            AVal::Usr(u)       => u.to_vval(),
140            AVal::Lst(l) => {
141                let v = VVal::vec();
142                for av in l.iter() {
143                    v.push(av.to_vval());
144                }
145                v
146            },
147            AVal::Map(m) => {
148                let mv = VVal::map();
149                for (k, v) in m.iter() {
150                    let k = s2sym(k);
151                    mv.set_key_sym(k, v.to_vval())
152                      .expect("AVal->VVal map not used more than once");
153                }
154                mv
155            },
156        }
157    }
158
159    /// Converts a VVal to an AVal.
160    ///
161    /// ```
162    /// use wlambda::*;
163    ///
164    /// let av = AVal::from_vval(&VVal::new_sym("x"));
165    /// if let AVal::Sym(s) = av {
166    ///     assert_eq!(s, "x");
167    /// } else {
168    ///     assert!(false);
169    /// }
170    /// ```
171    pub fn from_vval(v: &VVal) -> Self {
172        match v {
173            VVal::None => AVal::None,
174            VVal::Err(e) => {
175                let eb = e.borrow();
176                AVal::Err(
177                    Box::new(AVal::from_vval(&eb.0)),
178                    format!("{}", eb.1))
179            },
180            VVal::Bol(b)       => AVal::Bol(*b),
181            VVal::Sym(s)       => AVal::Sym(String::from(s.as_ref())),
182            VVal::Str(s)       => AVal::Str(s.as_ref().clone()),
183            VVal::Byt(b)       => AVal::Byt(b.as_ref().clone()),
184            VVal::Int(i)       => AVal::Int(*i),
185            VVal::Flt(f)       => AVal::Flt(*f),
186            VVal::Opt(None)    => AVal::Opt(None),
187            VVal::Opt(Some(v)) => AVal::Opt(Some(Box::new(AVal::from_vval(v)))),
188            VVal::FVec(v)      => AVal::FVec(**v),
189            VVal::IVec(v)      => AVal::IVec(**v),
190            VVal::Pair(p) =>
191                AVal::Pair(
192                    Box::new((
193                        AVal::from_vval(&p.0),
194                        AVal::from_vval(&p.1)))),
195            VVal::Lst(l) => {
196                let mut avec = vec![];
197                for vv in l.borrow().iter() {
198                    avec.push(AVal::from_vval(vv));
199                }
200                AVal::Lst(avec)
201            },
202            VVal::Map(m) => {
203                let mut amap =
204                    FnvHashMap::with_capacity_and_hasher(2, Default::default());
205                for (k, v) in m.borrow().iter() {
206                    amap.insert(String::from(k.as_ref()), AVal::from_vval(v));
207                }
208                AVal::Map(amap)
209            },
210            VVal::Usr(u) => {
211                let mut cl_ud = u.clone_ud();
212                if let Some(ud) = cl_ud.as_any().downcast_mut::<AtomicAVal>() {
213                    AVal::Atom(ud.clone())
214                } else if let Some(ud) = cl_ud.as_any().downcast_mut::<AValChannel>() {
215                    AVal::Chan(ud.clone())
216                } else if let Some(ud) = cl_ud.as_any().downcast_mut::<AtomicAValSlot>() {
217                    AVal::Slot(ud.clone())
218                } else if let Some(ud) = cl_ud.as_thread_safe_usr() {
219                    AVal::Usr(ud)
220                } else {
221                    AVal::None
222                }
223            },
224            _ => AVal::None,
225        }
226    }
227}
228
229/// WLambda:
230///
231/// ```text
232/// !atom = std:sync:atom:new 10;
233/// !channel = std:sync:mpsc:new[];
234/// !thrd = std:spawn_thread $q{
235///     !val = main.read;
236///     main.write $[1,$[0,1],3];
237///
238///     main.read_at 0;
239///     main.write_at $[1, 0] 320;
240///     q.send $["done", 10];
241///
242/// } ${ main = atom, q = channel };
243///
244/// !item = queue.pop[];
245/// .item = queue.pop_timeout 1000;
246///
247///
248/// ```
249
250/// Wraps an AVal like this: Arc<RwLock<AVal>>.
251/// An AtomicAVal is a thread safe container for VVal
252/// data structures. It's used by WLambda functions like
253/// `std:sync:atom:new`, `std:sync:atom:read` or `std:sync:atom:write`.
254///
255/// These containers are shared between the threads by passing them
256/// to the threads at `std:thread:spawn`.
257#[derive(Clone, Debug)]
258pub struct AtomicAVal(Arc<RwLock<AVal>>);
259
260impl Default for AtomicAVal {
261    fn default() -> Self { AtomicAVal::new() }
262}
263
264#[derive(Debug)]
265pub struct AtomicValSlot {
266    val: Mutex<(AVal, bool)>,
267    cv_send:  Condvar,
268    cv_recv:  Condvar,
269}
270
271#[derive(Clone, Debug)]
272pub struct AtomicAValSlot(Arc<AtomicValSlot>);
273
274impl Default for AtomicAValSlot {
275    fn default() -> Self { Self::new() }
276}
277
278impl AtomicAValSlot {
279    pub fn new() -> Self {
280        Self(Arc::new(AtomicValSlot {
281            val: Mutex::new((AVal::None, false)),
282            cv_recv: Condvar::new(),
283            cv_send: Condvar::new(),
284        }))
285    }
286
287    pub fn wait_empty(&self, dur: Option<std::time::Duration>) -> VVal {
288        let mut guard =
289            match self.0.val.lock() {
290                Ok(guard) => guard,
291                Err(e) => {
292                    return
293                        VVal::err_msg(
294                            &format!(
295                                "Failed to receive, can't get lock: {}", e))
296                }
297            };
298
299        if !guard.1 {
300            return VVal::Bol(true);
301        }
302
303        if let Some(dur) = dur {
304            loop {
305                match self.0.cv_recv.wait_timeout(guard, dur) {
306                    Ok(wait_res) => {
307                        guard = wait_res.0;
308
309                        if wait_res.1.timed_out() {
310                            return VVal::Bol(false);
311
312                        } else if !guard.1 {
313                            return VVal::Bol(true);
314                        }
315                    },
316                    Err(e) => {
317                        return
318                            VVal::err_msg(
319                                &format!(
320                                    "Failed to receive, poison error: {}",
321                                    e));
322                    },
323                }
324            }
325        } else {
326            loop {
327                match self.0.cv_recv.wait(guard) {
328                    Ok(next_guard) => {
329                        guard = next_guard;
330
331                        if !guard.1 {
332                            return VVal::Bol(true);
333                        }
334                    },
335                    Err(e) => {
336                        return
337                            VVal::err_msg(
338                                &format!(
339                                    "Failed to receive, poison error: {}",
340                                    e));
341                    },
342                }
343            }
344        }
345    }
346
347    pub fn check_empty(&self) -> VVal {
348        match self.0.val.lock() {
349            Ok(guard) => VVal::Bol(!guard.1),
350            Err(e) =>
351                VVal::err_msg(
352                    &format!(
353                        "Failed to check empty, can't get lock: {}", e)),
354        }
355    }
356
357    pub fn send(&self, msg: &VVal) -> VVal {
358        match self.0.val.lock() {
359            Ok(mut guard) => {
360                *guard = (AVal::from_vval(msg), true);
361                self.0.cv_send.notify_one();
362                VVal::Bol(true)
363            },
364            Err(e) =>
365                VVal::err_msg(&format!("Failed to send, can't get lock: {}", e)),
366        }
367    }
368
369    pub fn recv_timeout(&self, dur: Option<std::time::Duration>) -> VVal {
370        let mut guard =
371            match self.0.val.lock() {
372                Ok(guard) => guard,
373                Err(e) => {
374                    return
375                        VVal::err_msg(
376                            &format!(
377                                "Failed to receive, can't get lock: {}", e))
378                }
379            };
380
381        if let Some(dur) = dur {
382            if guard.1 {
383                let val = guard.0.to_vval();
384                *guard = (AVal::None, false);
385                self.0.cv_recv.notify_one();
386                return VVal::opt(val);
387            }
388
389            loop {
390                match self.0.cv_send.wait_timeout(guard, dur) {
391                    Ok(wait_res) => {
392                        guard = wait_res.0;
393
394                        if wait_res.1.timed_out() {
395                            return VVal::opt_none();
396
397                        } else if guard.1 {
398                            let val = guard.0.to_vval();
399                            *guard = (AVal::None, false);
400                            self.0.cv_recv.notify_one();
401                            return VVal::opt(val);
402                        }
403                    },
404                    Err(e) => {
405                        return
406                            VVal::err_msg(
407                                &format!(
408                                    "Failed to receive, poison error: {}",
409                                    e));
410                    },
411                }
412            }
413        } else {
414            if guard.1 {
415                let val = guard.0.to_vval();
416                *guard = (AVal::None, false);
417                self.0.cv_recv.notify_one();
418                return val;
419            }
420
421            loop {
422                match self.0.cv_send.wait(guard) {
423                    Ok(next_guard) => {
424                        guard = next_guard;
425
426                        if guard.1 {
427                            let val = guard.0.to_vval();
428                            *guard = (AVal::None, false);
429                            self.0.cv_recv.notify_one();
430                            return val;
431                        }
432                    },
433                    Err(e) => {
434                        return
435                            VVal::err_msg(
436                                &format!(
437                                    "Failed to receive, poison error: {}",
438                                    e));
439                    },
440                }
441            }
442        }
443    }
444
445    pub fn try_recv(&self) -> VVal {
446        match self.0.val.lock() {
447            Ok(mut guard) => {
448                if guard.1 {
449                    let val = guard.0.to_vval();
450                    *guard = (AVal::None, false);
451                    self.0.cv_recv.notify_one();
452                    VVal::opt(val)
453                } else {
454                    VVal::opt_none()
455                }
456            },
457            Err(e) =>
458                VVal::err_msg(&format!("Failed to receive, can't get lock: {}", e)),
459        }
460    }
461}
462
463impl VValUserData for AtomicAValSlot {
464    fn as_any(&mut self) -> &mut dyn std::any::Any { self }
465    fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
466        let argv = env.argv_ref();
467        match key {
468            "recv" => Ok(self.recv_timeout(None)),
469            "try_recv" => Ok(self.try_recv()),
470            "recv_timeout" => {
471                if argv.len() != 1 {
472                    return
473                        Err(StackAction::panic_str(
474                            "recv_timeout method expects 1 argument".to_string(),
475                            None,
476                            env.argv()))
477                }
478
479                Ok(self.recv_timeout(Some(argv[0].to_duration()?)))
480            },
481            "send" => {
482                if argv.len() != 1 {
483                    return
484                        Err(StackAction::panic_str(
485                            "send method expects 1 argument".to_string(),
486                            None,
487                            env.argv()))
488                }
489
490                Ok(self.send(&argv[0]))
491            },
492            "check_empty" => Ok(self.check_empty()),
493            "wait_empty" => Ok(self.wait_empty(None)),
494            "wait_empty_timeout" => {
495                if argv.len() != 1 {
496                    return
497                        Err(StackAction::panic_str(
498                            "wait_empty_timeout method expects 1 argument".to_string(),
499                            None,
500                            env.argv()))
501                }
502
503                Ok(self.wait_empty(Some(argv[0].to_duration()?)))
504            },
505            _ => {
506                Err(StackAction::panic_str(
507                    format!("unknown method called: {}", key),
508                    None,
509                    env.argv()))
510            },
511        }
512    }
513    fn clone_ud(&self) -> Box<dyn VValUserData> {
514        Box::new(self.clone())
515    }
516}
517
518#[derive(Clone)]
519pub struct AValChannel {
520    sender:     Arc<Mutex<Sender<AVal>>>,
521    receiver:   Arc<Mutex<Receiver<AVal>>>,
522}
523
524#[derive(Clone, Debug)]
525pub struct ForkSenderError(String);
526
527impl std::fmt::Display for ForkSenderError {
528    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
529        write!(f, "Failed to fork sender, can't get lock: {}", self.0)
530    }
531}
532
533impl AValChannel {
534    pub(crate) fn new_direct() -> AValChannel {
535        let (send, recv) = std::sync::mpsc::channel();
536        Self {
537            sender:     Arc::new(Mutex::new(send)),
538            receiver:   Arc::new(Mutex::new(recv)),
539        }
540    }
541
542    pub fn new_vval() -> VVal {
543        VVal::Usr(Box::new(Self::new_direct()))
544    }
545
546    pub(crate) fn fork_sender_direct(&self) -> Result<Self, ForkSenderError> {
547        match self.sender.lock() {
548            Ok(guard) => {
549                let receiver = self.receiver.clone();
550                let new_sender : Sender<AVal> = guard.clone();
551                Ok(Self {
552                    sender: Arc::new(Mutex::new(new_sender)),
553                    receiver,
554                })
555            },
556            Err(e) => Err(ForkSenderError(format!("{}", e))),
557        }
558    }
559
560    pub fn fork_sender(&self) -> VVal {
561        match self.fork_sender_direct() {
562            Ok(channel) => VVal::Usr(Box::new(channel)),
563            Err(ForkSenderError(msg)) =>
564                VVal::err_msg(
565                    &format!("Failed to fork sender, can't get lock: {}", msg)),
566        }
567    }
568
569    pub fn send(&self, msg: &VVal) -> VVal {
570        match self.sender.lock() {
571            Ok(guard) => {
572                let msg = AVal::from_vval(msg);
573                if let Err(e) = guard.send(msg) {
574                    VVal::err_msg(&format!("Failed to send: {}", e))
575                } else {
576                    VVal::Bol(true)
577                }
578            },
579            Err(e) =>
580                VVal::err_msg(&format!("Failed to send, can't get lock: {}", e)),
581        }
582    }
583
584    pub fn recv_timeout(&self, dur: Option<std::time::Duration>) -> VVal {
585        match self.receiver.lock() {
586            Ok(guard) => {
587                match dur {
588                    None => {
589                        match guard.recv() {
590                            Ok(av) => av.to_vval(),
591                            Err(_) =>
592                                VVal::err_msg("recv disconnected"),
593                        }
594                    },
595                    Some(dur) => {
596                        match guard.recv_timeout(dur) {
597                            Ok(av) => VVal::Opt(Some(Rc::new(av.to_vval()))),
598                            Err(RecvTimeoutError::Timeout)      => VVal::Opt(None),
599                            Err(RecvTimeoutError::Disconnected) =>
600                                VVal::err_msg("recv_timeout disconnected"),
601                        }
602                    }
603                }
604            },
605            Err(e) =>
606                VVal::err_msg(&format!("Failed to receive, can't get lock: {}", e)),
607        }
608    }
609
610    pub fn try_recv(&self) -> VVal {
611        match self.receiver.lock() {
612            Ok(guard) => {
613                match guard.try_recv() {
614                    Ok(av) => VVal::Opt(Some(Rc::new(av.to_vval()))),
615                    Err(TryRecvError::Empty) => VVal::Opt(None),
616                    Err(e) => VVal::err_msg(&format!("try_recv error: {}", e)),
617                }
618            },
619            Err(e) =>
620                VVal::err_msg(&format!("Failed to receive, can't get lock: {}", e)),
621        }
622    }
623}
624
625impl std::fmt::Debug for AValChannel {
626    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
627        write!(f, "<<AValChannel>>")
628    }
629}
630
631impl VValUserData for AValChannel {
632    fn as_any(&mut self) -> &mut dyn std::any::Any { self }
633    fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
634        let argv = env.argv_ref();
635        match key {
636            "recv" => Ok(self.recv_timeout(None)),
637            "try_recv" => Ok(self.try_recv()),
638            "recv_timeout" => {
639                if argv.len() != 1 {
640                    return
641                        Err(StackAction::panic_str(
642                            "recv_timeout method expects 1 argument".to_string(),
643                            None,
644                            env.argv()))
645                }
646
647                Ok(self.recv_timeout(Some(argv[0].to_duration()?)))
648            },
649            "send" => {
650                if argv.len() != 1 {
651                    return
652                        Err(StackAction::panic_str(
653                            "send method expects 1 argument".to_string(),
654                            None,
655                            env.argv()))
656                }
657
658                Ok(self.send(&argv[0]))
659            },
660            _ => {
661                Err(StackAction::panic_str(
662                    format!("unknown method called: {}", key),
663                    None,
664                    env.argv()))
665            },
666        }
667    }
668    fn clone_ud(&self) -> Box<dyn VValUserData> {
669        Box::new(self.clone())
670    }
671}
672
673impl AtomicAVal {
674    /// Creates a new empty instance, containing AVal::None.
675    pub fn new() -> Self {
676        Self(Arc::new(RwLock::new(AVal::None)))
677    }
678
679    /// Locks and stores the VVal.
680    pub fn write(&self, vv: &VVal) -> VVal {
681        let new_av = AVal::from_vval(vv);
682        if let Ok(mut guard) = self.0.write() {
683            *guard = new_av;
684            VVal::Bol(true)
685        } else {
686            VVal::err_msg("Lock Poisoned")
687        }
688    }
689
690    /// Locks and stores the VVal.
691    pub fn swap(&self, vv: &VVal) -> VVal {
692        let new_av = AVal::from_vval(vv);
693        if let Ok(mut guard) = self.0.write() {
694            let ret = guard.to_vval();
695            *guard = new_av;
696            ret
697        } else {
698            VVal::err_msg("Lock Poisoned")
699        }
700    }
701
702    /// Locks and reads the AVal and converts it to a VVal.
703    pub fn read(&self) -> VVal {
704        if let Ok(guard) = self.0.read() {
705            guard.to_vval()
706        } else {
707            VVal::err_msg("Lock Poisoned")
708        }
709    }
710
711    /// Locks and stores the VVal at the given key path.
712    pub fn store_at(&self, _keypath: &VVal, vv: &VVal) {
713        let new_av = AVal::from_vval(vv);
714        if let Ok(mut guard) = self.0.write() {
715            *guard = new_av;
716        }
717    }
718
719    /// Locks and reads the AVal at the given key path.
720    pub fn read_at(&self, _keypath: &VVal) -> VVal {
721        VVal::None
722    }
723}
724
725impl VValUserData for AtomicAVal {
726    fn as_any(&mut self) -> &mut dyn std::any::Any { self }
727    fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
728        let args = env.argv_ref();
729        match key {
730            "read" => Ok(self.read()),
731            "swap" => {
732                Ok(self.swap(&args[0]))
733            },
734            "write" => {
735                let v = &args[0];
736                self.write(v);
737                Ok(v.clone())
738            },
739            _ => {
740                Err(StackAction::panic_str(
741                    format!("Unknown method called: {}", key),
742                    None,
743                    env.argv()))
744            },
745        }
746    }
747    fn clone_ud(&self) -> Box<dyn VValUserData> {
748        Box::new(self.clone())
749    }
750}
751
752/// This trait allows WLambda to create new threads.
753/// You can either use the `DefaultThreadCreator`
754/// default implementation or provide your own. Providing
755/// your own might be necessary if you want to customize
756/// how a thread is created.
757///
758/// If you only want to customize the GlobalEnv of newly created
759/// threads please consider just providing your own
760/// `DefaultGlobalEnvCreator`:
761///
762/// ```
763/// use std::sync::Arc;
764/// use std::sync::Mutex;
765/// use wlambda::compiler::GlobalEnv;
766/// use wlambda::threads::{DefaultThreadCreator, FunctionGlobalEnvCreator};
767///
768/// let global_env = GlobalEnv::new_default();
769/// global_env.borrow_mut().set_thread_creator(
770///     Some(Arc::new(Mutex::new(
771///         DefaultThreadCreator::new(
772///             FunctionGlobalEnvCreator::from(
773///                 Box::new(|| GlobalEnv::new_default())))))));
774/// ```
775///
776/// If you want to influence more things on thread creation,
777/// see the followign example.
778/// Please refer to the source of `DefaultThreadCreator`
779/// for a comprehensive example.
780///
781/// ```
782/// use wlambda::*;
783/// use wlambda::threads::*;
784/// use std::sync::Arc;
785/// use std::sync::Mutex;
786///
787/// // For simplicity we make detached threads here and don't pass any globals.
788/// pub struct CustomThreadCreator();
789///
790/// impl ThreadCreator for CustomThreadCreator {
791///     fn new_env(&mut self) -> GlobalEnvRef {
792///         GlobalEnv::new_empty_default()
793///     }
794///
795///     fn spawn(&mut self, tc: Arc<Mutex<dyn ThreadCreator>>,
796///              code: String,
797///              globals: Option<std::vec::Vec<(String, AtomicAVal)>>) -> VVal {
798///
799///         let tcc = tc.clone();
800///         let hdl =
801///             std::thread::spawn(move || {
802///                 let genv = GlobalEnv::new_empty_default();
803///                 genv.borrow_mut().set_thread_creator(Some(tcc.clone()));
804///
805///                 let mut ctx = EvalContext::new(genv);
806///
807///                 match ctx.eval(&code) {
808///                     Ok(v) => AVal::from_vval(&v),
809///                     Err(e) => {
810///                         AVal::Err(
811///                             Box::new(
812///                                 AVal::Str(format!("Error in Thread: {}", e))),
813///                             String::from("?"))
814///                     }
815///                 }
816///             });
817///         VVal::None
818///     }
819/// }
820/// ```
821pub trait ThreadCreator: Send {
822    /// Spawns a new thread with the given ThreadCreator.
823    /// You need to pass the `tc` reference, so that the
824    /// GlobalEnv of the thread also knows how to
825    /// create new threads. You could even pass a different
826    /// ThreadCreator for those.
827    ///
828    /// `code` is a String containing WLambda code which
829    /// is executed after the new thread has been spawned.
830    /// `globals` is a mapping of global variable names and
831    /// AtomicAVal instances that are loaded into the threads
832    /// global environment. This is the only way to share
833    /// data between threads.
834    fn spawn(&mut self, tc: Arc<Mutex<dyn ThreadCreator>>,
835             code: String,
836             globals: Option<std::vec::Vec<(String, AtomicAVal)>>) -> VVal;
837
838    fn new_env(&mut self) -> GlobalEnvRef;
839}
840
841impl std::fmt::Debug for dyn ThreadCreator {
842    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
843        write!(f, "<<ThreadCreator>>")
844    }
845}
846
847/// This trait handles creation of the GlobalEnv for new
848/// threads. You can reuse the `DefaultThreadCreator` by
849/// just providing it a custom `ThreadGlobalEnvCreator` implementation.
850/// This suffices in most cases where you want to embed WLambda
851/// and provide a custom API to new threads.
852pub trait ThreadGlobalEnvCreator: Send {
853    fn new_env(&mut self) -> GlobalEnvRef {
854        GlobalEnv::new_empty_default()
855    }
856}
857
858/// This is an easy way to provide a `GlobalEnv` creation
859/// function to the DefaultThreadCreator.
860///
861/// ```
862/// use std::sync::Arc;
863/// use std::sync::Mutex;
864/// use wlambda::{GlobalEnv};
865/// use wlambda::threads::{DefaultThreadCreator, FunctionGlobalEnvCreator};
866///
867/// let global_env = GlobalEnv::new_default();
868/// global_env.borrow_mut().set_thread_creator(
869///     Some(Arc::new(Mutex::new(
870///         DefaultThreadCreator::new(
871///             FunctionGlobalEnvCreator::from(
872///                 Box::new(|| GlobalEnv::new_default())))))));
873/// ```
874pub struct FunctionGlobalEnvCreator {
875    fun: Box<dyn FnMut() -> GlobalEnvRef + Send>,
876}
877
878impl FunctionGlobalEnvCreator {
879    /// Creates a new FunctionGlobalEnvCreator from a closure.
880    ///
881    /// ```
882    /// use std::sync::Arc;
883    /// use std::sync::Mutex;
884    /// use wlambda::{GlobalEnv};
885    /// use wlambda::threads::{DefaultThreadCreator, FunctionGlobalEnvCreator};
886    ///
887    /// let global_env = GlobalEnv::new_default();
888    /// global_env.borrow_mut().set_thread_creator(
889    ///     Some(Arc::new(Mutex::new(
890    ///         DefaultThreadCreator::new(
891    ///             FunctionGlobalEnvCreator::from(
892    ///                 Box::new(|| GlobalEnv::new_default())))))));
893    /// ```
894    pub fn from(fun: Box<dyn FnMut() -> GlobalEnvRef + Send>) -> Self {
895        Self { fun }
896    }
897}
898
899impl ThreadGlobalEnvCreator for FunctionGlobalEnvCreator {
900    fn new_env(&mut self) -> GlobalEnvRef {
901        (self.fun)()
902    }
903}
904
905/// Default implementation of a ThreadCreator.
906///
907/// See also `GlobalEnv::new_default` for further information
908/// how this may be used.
909pub struct DefaultThreadCreator<A: ThreadGlobalEnvCreator>(A);
910
911#[allow(clippy::new_without_default)]
912impl<A> DefaultThreadCreator<A> where A: ThreadGlobalEnvCreator {
913    pub fn new(env_creator: A) -> Self { Self(env_creator) }
914}
915
916/// To join a thread that was created by a DefaultThreadCreator
917/// this JoinHandle wrapper is used. It provides a way to wrap it
918/// into a `VValUserData` and use it by the WLambda function `std:thread:join`.
919#[derive(Clone)]
920pub(crate) struct DefaultThreadHandle {
921    ready_slot:    AtomicAValSlot,
922    thread_handle: Rc<RefCell<Option<std::thread::JoinHandle<AVal>>>>,
923}
924
925impl DefaultThreadHandle {
926    /// Joins the handle, and returns the result VVal of the thread.
927    pub fn join(&self) -> VVal {
928        let hdl = std::mem::replace(&mut (*self.thread_handle.borrow_mut()), None);
929        if let Some(h) = hdl {
930            h.join().unwrap().to_vval()
931        } else {
932            VVal::err_msg("DefaultThreadHandle already joined!")
933        }
934    }
935
936    /// Returns an AtomicAValSlot, that can be used to check if the thread
937    /// successfully started or ran into an error.
938    pub fn get_ready_slot(&self) -> VVal {
939        VVal::Usr(Box::new(self.ready_slot.clone()))
940    }
941}
942
943impl VValUserData for DefaultThreadHandle {
944    fn as_any(&mut self) -> &mut dyn std::any::Any { self }
945    fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
946        match key {
947            "join"       => Ok(self.join()),
948            "recv_ready" => {
949                if let VVal::Usr(b) = self.get_ready_slot() {
950                    b.call_method("recv", env)
951                } else {
952                    Ok(VVal::err_msg("Invalid _READY value!"))
953                }
954            },
955            _ => {
956                Err(StackAction::panic_str(
957                    format!("Unknown method called: {}", key),
958                    None,
959                    env.argv()))
960            },
961        }
962    }
963    fn clone_ud(&self) -> Box<dyn crate::vval::VValUserData> {
964        Box::new(self.clone())
965    }
966}
967
968impl<A> ThreadCreator for DefaultThreadCreator<A> where A: ThreadGlobalEnvCreator {
969    fn new_env(&mut self) -> GlobalEnvRef { self.0.new_env() }
970
971    fn spawn(&mut self, tc: Arc<Mutex<dyn ThreadCreator>>,
972             code: String,
973             globals: Option<std::vec::Vec<(String, AtomicAVal)>>) -> VVal {
974
975        let ready = AtomicAValSlot::new();
976        let tcc = tc.clone();
977
978        let trdy = ready.clone();
979        let hdl =
980            std::thread::spawn(move || {
981                let genv =
982                    match tcc.lock() {
983                        Ok(mut tcc) => { (*tcc).new_env() },
984                        Err(e) => panic!("Can't lock ThreadCreator: {}", e),
985                    };
986
987                genv.borrow_mut().set_thread_creator(Some(tcc.clone()));
988
989                genv.borrow_mut().set_var(
990                    "_READY",
991                    &VVal::Usr(Box::new(trdy.clone())));
992
993                if let Some(globals) = globals {
994                    for (k, av) in globals {
995                        let v = av.read();
996                        genv.borrow_mut().set_var(&k, &v);
997                    }
998                }
999
1000                let mut ctx = EvalContext::new(genv);
1001
1002                match ctx.eval(&code) {
1003                    Ok(v) => AVal::from_vval(&v),
1004                    Err(e) => {
1005                        eprintln!("Error in thread: {}", e);
1006                        let ret =
1007                            AVal::Err(
1008                                Box::new(
1009                                    AVal::Str(format!("Error in thread: {}", e))),
1010                                String::from("?"));
1011                        trdy.send(&ret.to_vval());
1012                        ret
1013                    }
1014                }
1015            });
1016
1017        VVal::Usr(Box::new(DefaultThreadHandle {
1018            ready_slot: ready,
1019            thread_handle: Rc::new(RefCell::new(Some(hdl)))
1020        }))
1021    }
1022}