1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use libc::c_int;
use process_manager::PROCESS_MANAGER;
use shell_child::ShellChildArc;
use std::any::Any;
use std::cell::RefCell;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
use std::thread::ThreadId;

/// Thread local shell.
pub struct LocalShell {
    processes: Vec<ShellChildArc>,
    signaled: bool,
}

impl LocalShell {
    fn new() -> LocalShell {
        LocalShell {
            processes: Vec::new(),
            signaled: false,
        }
    }

    pub fn add_process(&mut self, process: &ShellChildArc) {
        self.processes.push(process.clone());
    }

    pub fn remove_process(&mut self, process: &ShellChildArc) {
        self.processes.retain(|p| !Arc::ptr_eq(p, process));
    }

    pub fn signal(&mut self, signal: c_int) {
        self.signaled = true;
        for process in &self.processes {
            let lock = process.read().unwrap();
            if let Some(child) = lock.as_ref() {
                if let Err(error) = child.signal(signal) {
                    error!("Failed to send a signal {:?}", error);
                }
            }
        }
    }

    pub fn wait(&mut self) {
        for process in &self.processes {
            let mut lock = process.write().unwrap();
            if let Some(child) = lock.take() {
                if let Err(error) = child.wait() {
                    error!("Failed to wait process {:?}", error);
                }
            }
        }
    }

    pub fn signaled(&self) -> bool {
        self.signaled
    }
}

struct LocalShellScope(ThreadId, Arc<Mutex<LocalShell>>);

impl LocalShellScope {
    fn new(arc: &Arc<Mutex<LocalShell>>) -> LocalShellScope {
        let mut lock = PROCESS_MANAGER.lock().unwrap();
        let id = thread::current().id();
        lock.add_local_shell(&id, arc);

        LocalShellScope(id, arc.clone())
    }
}

impl Default for LocalShellScope {
    fn default() -> LocalShellScope {
        LocalShellScope::new(&Arc::new(Mutex::new(LocalShell::new())))
    }
}

impl Drop for LocalShellScope {
    fn drop(&mut self) {
        let mut lock = PROCESS_MANAGER.lock().unwrap();
        lock.remove_local_shell(&self.0);
    }
}

pub struct ShellHandle<T> {
    join_handle: JoinHandle<T>,
    shell: Arc<Mutex<LocalShell>>,
}

impl<T> ShellHandle<T> {
    pub fn signal(&self, signal: c_int) {
        let mut lock = self.shell.lock().unwrap();
        lock.signal(signal);
    }

    pub fn join(self) -> Result<T, Box<dyn Any + Send + 'static>> {
        self.join_handle.join()
    }
}

impl<T> Deref for ShellHandle<T> {
    type Target = JoinHandle<T>;
    fn deref(&self) -> &Self::Target {
        &self.join_handle
    }
}

pub fn spawn<F, T>(f: F) -> ShellHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static,
{
    let arc = Arc::new(Mutex::new(LocalShell::new()));
    let arc_clone = arc.clone();
    let join_handle = thread::spawn(move || -> T {
        LOCAL_SHELL_SCOPE.with(|shell| {
            let mut shell = shell.borrow_mut();
            if shell.is_some() {
                panic!("Shell has already registered");
            }
            *shell = Some(LocalShellScope::new(&arc_clone));
        });
        f()
    });
    ShellHandle {
        join_handle: join_handle,
        shell: arc,
    }
}

pub fn current_shell() -> Arc<Mutex<LocalShell>> {
    LOCAL_SHELL_SCOPE.with(|shell| {
        shell
            .borrow_mut()
            .get_or_insert(LocalShellScope::default())
            .1
            .clone()
    })
}

thread_local! {
    static LOCAL_SHELL_SCOPE: RefCell<Option<LocalShellScope>> =
        RefCell::new(None);
}