nu-protocol 0.112.2

Nushell's internal protocols, including its abstract syntax tree
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
use std::{
    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
    sync::{
        Arc, Mutex,
        mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError},
    },
};

#[cfg(not(target_family = "wasm"))]
use nu_utils::time::Instant;
#[cfg(not(target_family = "wasm"))]
use std::time::Duration;

use nu_system::{UnfreezeHandle, kill_by_pid};

use crate::{PipelineData, Signals, shell_error};

use crate::JobId;

pub struct Jobs {
    next_job_id: usize,

    // this is the ID of the most recently added frozen job in the jobs table.
    // the methods of this struct must ensure the invariant of this always
    // being None or pointing to a valid job in the table
    last_frozen_job_id: Option<JobId>,
    jobs: HashMap<JobId, Job>,
}

impl Default for Jobs {
    fn default() -> Self {
        Self {
            next_job_id: 1,
            last_frozen_job_id: None,
            jobs: HashMap::default(),
        }
    }
}

impl Jobs {
    pub fn iter(&self) -> impl Iterator<Item = (JobId, &Job)> {
        self.jobs.iter().map(|(k, v)| (*k, v))
    }

    pub fn lookup(&self, id: JobId) -> Option<&Job> {
        self.jobs.get(&id)
    }

    pub fn lookup_mut(&mut self, id: JobId) -> Option<&mut Job> {
        self.jobs.get_mut(&id)
    }

    pub fn remove_job(&mut self, id: JobId) -> Option<Job> {
        if self.last_frozen_job_id.is_some_and(|last| id == last) {
            self.last_frozen_job_id = None;
        }

        self.jobs.remove(&id)
    }

    fn assign_last_frozen_id_if_frozen(&mut self, id: JobId, job: &Job) {
        if let Job::Frozen(_) = job {
            self.last_frozen_job_id = Some(id);
        }
    }

    pub fn add_job(&mut self, job: Job) -> JobId {
        let this_id = JobId::new(self.next_job_id);

        self.assign_last_frozen_id_if_frozen(this_id, &job);

        self.jobs.insert(this_id, job);
        self.next_job_id += 1;

        this_id
    }

    pub fn most_recent_frozen_job_id(&mut self) -> Option<JobId> {
        self.last_frozen_job_id
    }

    // this is useful when you want to remove a job from the list and add it back later
    pub fn add_job_with_id(&mut self, id: JobId, job: Job) -> Result<(), &'static str> {
        self.assign_last_frozen_id_if_frozen(id, &job);

        if let std::collections::hash_map::Entry::Vacant(e) = self.jobs.entry(id) {
            e.insert(job);
            Ok(())
        } else {
            Err("job already exists")
        }
    }

    /// This function tries to forcefully kill a job from this job table,
    /// removes it from the job table. It always succeeds in removing the job
    /// from the table, but may fail in killing the job's active processes.
    pub fn kill_and_remove(&mut self, id: JobId) -> shell_error::io::Result<()> {
        if let Some(job) = self.jobs.get(&id) {
            let err = job.kill();

            self.remove_job(id);

            err?
        }

        Ok(())
    }

    /// This function tries to forcefully kill all the background jobs and
    /// removes all of them from the job table.
    ///
    /// It returns an error if any of the job killing attempts fails, but always
    /// succeeds in removing the jobs from the table.
    pub fn kill_all(&mut self) -> shell_error::io::Result<()> {
        self.last_frozen_job_id = None;

        let first_err = self
            .iter()
            .map(|(_, job)| job.kill().err())
            .fold(None, |acc, x| acc.or(x));

        self.jobs.clear();

        if let Some(err) = first_err {
            Err(err)
        } else {
            Ok(())
        }
    }
}

pub enum Job {
    Thread(ThreadJob),
    Frozen(FrozenJob),
}

// A thread job represents a job that is currently executing as a background thread in nushell.
// This is an Arc-y type, cloning it does not uniquely clone the information of this particular
// job.

// Although rust's documentation does not document the acquire-release semantics of Mutex, this
// is a direct undocumentented requirement of its soundness, and is thus assumed by this
// implementaation.
// see issue https://github.com/rust-lang/rust/issues/126239.
#[derive(Clone)]
pub struct ThreadJob {
    signals: Signals,
    pids: Arc<Mutex<HashSet<u32>>>,
    description: Option<String>,
    pub sender: Sender<Mail>,
}

impl ThreadJob {
    pub fn new(signals: Signals, description: Option<String>, sender: Sender<Mail>) -> Self {
        ThreadJob {
            signals,
            pids: Arc::new(Mutex::new(HashSet::default())),
            sender,
            description,
        }
    }

    /// Tries to add the provided pid to the active pid set of the current job.
    ///
    /// Returns true if the pid was added successfully, or false if the
    /// current job is interrupted.
    pub fn try_add_pid(&self, pid: u32) -> bool {
        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");

        // note: this signals check must occur after the pids lock has been locked.
        if self.signals.interrupted() {
            false
        } else {
            pids.insert(pid);
            true
        }
    }

    pub fn collect_pids(&self) -> Vec<u32> {
        let lock = self.pids.lock().expect("PID lock was poisoned");

        lock.iter().copied().collect()
    }

    pub fn kill(&self) -> shell_error::io::Result<()> {
        // it's okay to make this interrupt outside of the mutex, since it has acquire-release
        // semantics.

        self.signals.trigger();

        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");

        for pid in pids.iter() {
            kill_by_pid((*pid).into())?;
        }

        pids.clear();

        Ok(())
    }

    pub fn remove_pid(&self, pid: u32) {
        let mut pids = self.pids.lock().expect("PID lock was poisoned");

        pids.remove(&pid);
    }
}

impl Job {
    pub fn kill(&self) -> shell_error::io::Result<()> {
        match self {
            Job::Thread(thread_job) => thread_job.kill(),
            Job::Frozen(frozen_job) => frozen_job.kill(),
        }
    }

    pub fn description(&self) -> Option<&String> {
        match self {
            Job::Thread(thread_job) => thread_job.description.as_ref(),
            Job::Frozen(frozen_job) => frozen_job.description.as_ref(),
        }
    }

    pub fn assign_description(&mut self, description: Option<String>) {
        match self {
            Job::Thread(thread_job) => thread_job.description = description,
            Job::Frozen(frozen_job) => frozen_job.description = description,
        }
    }
}

pub struct FrozenJob {
    pub unfreeze: UnfreezeHandle,
    pub description: Option<String>,
}

impl FrozenJob {
    pub fn kill(&self) -> shell_error::io::Result<()> {
        #[cfg(unix)]
        {
            Ok(kill_by_pid(self.unfreeze.pid() as i64)?)
        }

        // it doesn't happen outside unix.
        #[cfg(not(unix))]
        {
            Ok(())
        }
    }
}

/// Stores the information about the background job currently being executed by this thread, if any
#[derive(Clone)]
pub struct CurrentJob {
    pub id: JobId,

    // The background thread job associated with this thread.
    // If None, it indicates this thread is currently the main job
    pub background_thread_job: Option<ThreadJob>,

    // note: although the mailbox is Mutex'd, it is only ever accessed
    // by the current job's threads
    pub mailbox: Arc<Mutex<Mailbox>>,
}

// The storage for unread messages
//
// Messages are initially sent over a mpsc channel,
// and may then be stored in a IgnoredMail struct when
// filtered out by a message tag.
pub struct Mailbox {
    receiver: Receiver<Mail>,
    ignored_mail: IgnoredMail,
}

impl Mailbox {
    pub fn new(receiver: Receiver<Mail>) -> Self {
        Mailbox {
            receiver,
            ignored_mail: IgnoredMail::default(),
        }
    }

    #[cfg(not(target_family = "wasm"))]
    pub fn recv_timeout(
        &mut self,
        filter_tag: Option<FilterTag>,
        timeout: Duration,
    ) -> Result<PipelineData, RecvTimeoutError> {
        if let Some(value) = self.ignored_mail.pop(filter_tag) {
            Ok(value)
        } else {
            let mut waited_so_far = Duration::ZERO;
            let mut before = Instant::now();

            while waited_so_far < timeout {
                let (tag, value) = self
                    .receiver
                    .recv_timeout(timeout.checked_sub(waited_so_far).unwrap_or(Duration::ZERO))?;

                if filter_tag.is_none() || filter_tag == tag {
                    return Ok(value);
                } else {
                    self.ignored_mail.add((tag, value));
                    let now = Instant::now();
                    waited_so_far += now - before;
                    before = now;
                }
            }

            Err(RecvTimeoutError::Timeout)
        }
    }

    #[cfg(not(target_family = "wasm"))]
    pub fn try_recv(
        &mut self,
        filter_tag: Option<FilterTag>,
    ) -> Result<PipelineData, TryRecvError> {
        if let Some(value) = self.ignored_mail.pop(filter_tag) {
            Ok(value)
        } else {
            loop {
                let (tag, value) = self.receiver.try_recv()?;

                if filter_tag.is_none() || filter_tag == tag {
                    return Ok(value);
                } else {
                    self.ignored_mail.add((tag, value));
                }
            }
        }
    }

    pub fn clear(&mut self) {
        self.ignored_mail.clear();

        while self.receiver.try_recv().is_ok() {}
    }
}

// A data structure used to store messages which were received, but currently ignored by a tag filter
// messages are added and popped in a first-in-first-out matter.
#[derive(Default)]
struct IgnoredMail {
    next_id: usize,
    messages: BTreeMap<usize, Mail>,
    by_tag: HashMap<FilterTag, BTreeSet<usize>>,
}

pub type FilterTag = u64;
pub type Mail = (Option<FilterTag>, PipelineData);

impl IgnoredMail {
    pub fn add(&mut self, (tag, value): Mail) {
        let id = self.next_id;
        self.next_id += 1;

        self.messages.insert(id, (tag, value));

        if let Some(tag) = tag {
            self.by_tag.entry(tag).or_default().insert(id);
        }
    }

    pub fn pop(&mut self, tag: Option<FilterTag>) -> Option<PipelineData> {
        if let Some(tag) = tag {
            self.pop_oldest_with_tag(tag)
        } else {
            self.pop_oldest()
        }
    }

    pub fn clear(&mut self) {
        self.messages.clear();
        self.by_tag.clear();
    }

    fn pop_oldest(&mut self) -> Option<PipelineData> {
        let (id, (tag, value)) = self.messages.pop_first()?;

        if let Some(tag) = tag {
            let needs_cleanup = if let Some(ids) = self.by_tag.get_mut(&tag) {
                ids.remove(&id);
                ids.is_empty()
            } else {
                false
            };

            if needs_cleanup {
                self.by_tag.remove(&tag);
            }
        }

        Some(value)
    }

    fn pop_oldest_with_tag(&mut self, tag: FilterTag) -> Option<PipelineData> {
        let ids = self.by_tag.get_mut(&tag)?;

        let id = ids.pop_first()?;

        if ids.is_empty() {
            self.by_tag.remove(&tag);
        }

        Some(self.messages.remove(&id)?.1)
    }
}