alacritty_terminal/
event_loop.rs

1//! The main event loop which performs I/O on the pseudoterminal.
2
3use std::borrow::Cow;
4use std::collections::VecDeque;
5use std::fmt::{self, Display, Formatter};
6use std::fs::File;
7use std::io::{self, ErrorKind, Read, Write};
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
11use std::thread::JoinHandle;
12use std::time::Instant;
13
14use log::error;
15use polling::{Event as PollingEvent, Events, PollMode};
16
17use crate::event::{self, Event, EventListener, WindowSize};
18use crate::sync::FairMutex;
19use crate::term::Term;
20use crate::{thread, tty};
21use vte::ansi;
22
23/// Max bytes to read from the PTY before forced terminal synchronization.
24pub(crate) const READ_BUFFER_SIZE: usize = 0x10_0000;
25
26/// Max bytes to read from the PTY while the terminal is locked.
27const MAX_LOCKED_READ: usize = u16::MAX as usize;
28
29/// Messages that may be sent to the `EventLoop`.
30#[derive(Debug)]
31pub enum Msg {
32    /// Data that should be written to the PTY.
33    Input(Cow<'static, [u8]>),
34
35    /// Indicates that the `EventLoop` should shut down, as Alacritty is shutting down.
36    Shutdown,
37
38    /// Instruction to resize the PTY.
39    Resize(WindowSize),
40}
41
42/// The main event loop.
43///
44/// Handles all the PTY I/O and runs the PTY parser which updates terminal
45/// state.
46pub struct EventLoop<T: tty::EventedPty, U: EventListener> {
47    poll: Arc<polling::Poller>,
48    pty: T,
49    rx: PeekableReceiver<Msg>,
50    tx: Sender<Msg>,
51    terminal: Arc<FairMutex<Term<U>>>,
52    event_proxy: U,
53    drain_on_exit: bool,
54    ref_test: bool,
55}
56
57impl<T, U> EventLoop<T, U>
58where
59    T: tty::EventedPty + event::OnResize + Send + 'static,
60    U: EventListener + Send + 'static,
61{
62    /// Create a new event loop.
63    pub fn new(
64        terminal: Arc<FairMutex<Term<U>>>,
65        event_proxy: U,
66        pty: T,
67        drain_on_exit: bool,
68        ref_test: bool,
69    ) -> io::Result<EventLoop<T, U>> {
70        let (tx, rx) = mpsc::channel();
71        let poll = polling::Poller::new()?.into();
72        Ok(EventLoop {
73            poll,
74            pty,
75            tx,
76            rx: PeekableReceiver::new(rx),
77            terminal,
78            event_proxy,
79            drain_on_exit,
80            ref_test,
81        })
82    }
83
84    pub fn channel(&self) -> EventLoopSender {
85        EventLoopSender { sender: self.tx.clone(), poller: self.poll.clone() }
86    }
87
88    /// Drain the channel.
89    ///
90    /// Returns `false` when a shutdown message was received.
91    fn drain_recv_channel(&mut self, state: &mut State) -> bool {
92        while let Some(msg) = self.rx.recv() {
93            match msg {
94                Msg::Input(input) => state.write_list.push_back(input),
95                Msg::Resize(window_size) => self.pty.on_resize(window_size),
96                Msg::Shutdown => return false,
97            }
98        }
99
100        true
101    }
102
103    #[inline]
104    fn pty_read<X>(
105        &mut self,
106        state: &mut State,
107        buf: &mut [u8],
108        mut writer: Option<&mut X>,
109    ) -> io::Result<()>
110    where
111        X: Write,
112    {
113        let mut unprocessed = 0;
114        let mut processed = 0;
115
116        // Reserve the next terminal lock for PTY reading.
117        let _terminal_lease = Some(self.terminal.lease());
118        let mut terminal = None;
119
120        loop {
121            // Read from the PTY.
122            match self.pty.reader().read(&mut buf[unprocessed..]) {
123                // This is received on Windows/macOS when no more data is readable from the PTY.
124                Ok(0) if unprocessed == 0 => break,
125                Ok(got) => unprocessed += got,
126                Err(err) => match err.kind() {
127                    ErrorKind::Interrupted | ErrorKind::WouldBlock => {
128                        // Go back to mio if we're caught up on parsing and the PTY would block.
129                        if unprocessed == 0 {
130                            break;
131                        }
132                    },
133                    _ => return Err(err),
134                },
135            }
136
137            // Attempt to lock the terminal.
138            let terminal = match &mut terminal {
139                Some(terminal) => terminal,
140                None => terminal.insert(match self.terminal.try_lock_unfair() {
141                    // Force block if we are at the buffer size limit.
142                    None if unprocessed >= READ_BUFFER_SIZE => self.terminal.lock_unfair(),
143                    None => continue,
144                    Some(terminal) => terminal,
145                }),
146            };
147
148            // Write a copy of the bytes to the ref test file.
149            if let Some(writer) = &mut writer {
150                writer.write_all(&buf[..unprocessed]).unwrap();
151            }
152
153            // Parse the incoming bytes.
154            state.parser.advance(&mut **terminal, &buf[..unprocessed]);
155
156            processed += unprocessed;
157            unprocessed = 0;
158
159            // Assure we're not blocking the terminal too long unnecessarily.
160            if processed >= MAX_LOCKED_READ {
161                break;
162            }
163        }
164
165        // Queue terminal redraw unless all processed bytes were synchronized.
166        if state.parser.sync_bytes_count() < processed && processed > 0 {
167            self.event_proxy.send_event(Event::Wakeup);
168        }
169
170        Ok(())
171    }
172
173    #[inline]
174    fn pty_write(&mut self, state: &mut State) -> io::Result<()> {
175        state.ensure_next();
176
177        'write_many: while let Some(mut current) = state.take_current() {
178            'write_one: loop {
179                match self.pty.writer().write(current.remaining_bytes()) {
180                    Ok(0) => {
181                        state.set_current(Some(current));
182                        break 'write_many;
183                    },
184                    Ok(n) => {
185                        current.advance(n);
186                        if current.finished() {
187                            state.goto_next();
188                            break 'write_one;
189                        }
190                    },
191                    Err(err) => {
192                        state.set_current(Some(current));
193                        match err.kind() {
194                            ErrorKind::Interrupted | ErrorKind::WouldBlock => break 'write_many,
195                            _ => return Err(err),
196                        }
197                    },
198                }
199            }
200        }
201
202        Ok(())
203    }
204
205    pub fn spawn(mut self) -> JoinHandle<(Self, State)> {
206        thread::spawn_named("PTY reader", move || {
207            let mut state = State::default();
208            let mut buf = [0u8; READ_BUFFER_SIZE];
209
210            let poll_opts = PollMode::Level;
211            let mut interest = PollingEvent::readable(0);
212
213            // Register TTY through EventedRW interface.
214            if let Err(err) = unsafe { self.pty.register(&self.poll, interest, poll_opts) } {
215                error!("Event loop registration error: {err}");
216                return (self, state);
217            }
218
219            let mut events = Events::with_capacity(NonZeroUsize::new(1024).unwrap());
220
221            let mut pipe = if self.ref_test {
222                Some(File::create("./alacritty.recording").expect("create alacritty recording"))
223            } else {
224                None
225            };
226
227            'event_loop: loop {
228                // Wakeup the event loop when a synchronized update timeout was reached.
229                let handler = state.parser.sync_timeout();
230                let timeout =
231                    handler.sync_timeout().map(|st| st.saturating_duration_since(Instant::now()));
232
233                events.clear();
234                if let Err(err) = self.poll.wait(&mut events, timeout) {
235                    match err.kind() {
236                        ErrorKind::Interrupted => continue,
237                        _ => {
238                            error!("Event loop polling error: {err}");
239                            break 'event_loop;
240                        },
241                    }
242                }
243
244                // Handle synchronized update timeout.
245                if events.is_empty() && self.rx.peek().is_none() {
246                    state.parser.stop_sync(&mut *self.terminal.lock());
247                    self.event_proxy.send_event(Event::Wakeup);
248                    continue;
249                }
250
251                // Handle channel events, if there are any.
252                if !self.drain_recv_channel(&mut state) {
253                    break;
254                }
255
256                for event in events.iter() {
257                    match event.key {
258                        tty::PTY_CHILD_EVENT_TOKEN => {
259                            if let Some(tty::ChildEvent::Exited(code)) = self.pty.next_child_event()
260                            {
261                                if let Some(code) = code {
262                                    self.event_proxy.send_event(Event::ChildExit(code));
263                                }
264                                if self.drain_on_exit {
265                                    let _ = self.pty_read(&mut state, &mut buf, pipe.as_mut());
266                                }
267                                self.terminal.lock().exit();
268                                self.event_proxy.send_event(Event::Wakeup);
269                                break 'event_loop;
270                            }
271                        },
272
273                        tty::PTY_READ_WRITE_TOKEN => {
274                            if event.is_interrupt() {
275                                // Don't try to do I/O on a dead PTY.
276                                continue;
277                            }
278
279                            if event.readable {
280                                if let Err(err) = self.pty_read(&mut state, &mut buf, pipe.as_mut())
281                                {
282                                    // On Linux, a `read` on the master side of a PTY can fail
283                                    // with `EIO` if the client side hangs up.  In that case,
284                                    // just loop back round for the inevitable `Exited` event.
285                                    // This sucks, but checking the process is either racy or
286                                    // blocking.
287                                    #[cfg(target_os = "linux")]
288                                    if err.raw_os_error() == Some(libc::EIO) {
289                                        continue;
290                                    }
291
292                                    error!("Error reading from PTY in event loop: {err}");
293                                    break 'event_loop;
294                                }
295                            }
296
297                            if event.writable {
298                                if let Err(err) = self.pty_write(&mut state) {
299                                    error!("Error writing to PTY in event loop: {err}");
300                                    break 'event_loop;
301                                }
302                            }
303                        },
304                        _ => (),
305                    }
306                }
307
308                // Register write interest if necessary.
309                let needs_write = state.needs_write();
310                if needs_write != interest.writable {
311                    interest.writable = needs_write;
312
313                    // Re-register with new interest.
314                    self.pty.reregister(&self.poll, interest, poll_opts).unwrap();
315                }
316            }
317
318            // The evented instances are not dropped here so deregister them explicitly.
319            let _ = self.pty.deregister(&self.poll);
320
321            (self, state)
322        })
323    }
324}
325
326/// Helper type which tracks how much of a buffer has been written.
327struct Writing {
328    source: Cow<'static, [u8]>,
329    written: usize,
330}
331
332pub struct Notifier(pub EventLoopSender);
333
334impl event::Notify for Notifier {
335    fn notify<B>(&self, bytes: B)
336    where
337        B: Into<Cow<'static, [u8]>>,
338    {
339        let bytes = bytes.into();
340        // Terminal hangs if we send 0 bytes through.
341        if bytes.is_empty() {
342            return;
343        }
344
345        let _ = self.0.send(Msg::Input(bytes));
346    }
347}
348
349impl event::OnResize for Notifier {
350    fn on_resize(&mut self, window_size: WindowSize) {
351        let _ = self.0.send(Msg::Resize(window_size));
352    }
353}
354
355#[derive(Debug)]
356pub enum EventLoopSendError {
357    /// Error polling the event loop.
358    Io(io::Error),
359
360    /// Error sending a message to the event loop.
361    Send(mpsc::SendError<Msg>),
362}
363
364impl Display for EventLoopSendError {
365    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
366        match self {
367            EventLoopSendError::Io(err) => err.fmt(f),
368            EventLoopSendError::Send(err) => err.fmt(f),
369        }
370    }
371}
372
373impl std::error::Error for EventLoopSendError {
374    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
375        match self {
376            EventLoopSendError::Io(err) => err.source(),
377            EventLoopSendError::Send(err) => err.source(),
378        }
379    }
380}
381
382#[derive(Clone)]
383pub struct EventLoopSender {
384    sender: Sender<Msg>,
385    poller: Arc<polling::Poller>,
386}
387
388impl EventLoopSender {
389    pub fn send(&self, msg: Msg) -> Result<(), EventLoopSendError> {
390        self.sender.send(msg).map_err(EventLoopSendError::Send)?;
391        self.poller.notify().map_err(EventLoopSendError::Io)
392    }
393}
394
395/// All of the mutable state needed to run the event loop.
396///
397/// Contains list of items to write, current write state, etc. Anything that
398/// would otherwise be mutated on the `EventLoop` goes here.
399#[derive(Default)]
400pub struct State {
401    write_list: VecDeque<Cow<'static, [u8]>>,
402    writing: Option<Writing>,
403    parser: ansi::Processor,
404}
405
406impl State {
407    #[inline]
408    fn ensure_next(&mut self) {
409        if self.writing.is_none() {
410            self.goto_next();
411        }
412    }
413
414    #[inline]
415    fn goto_next(&mut self) {
416        self.writing = self.write_list.pop_front().map(Writing::new);
417    }
418
419    #[inline]
420    fn take_current(&mut self) -> Option<Writing> {
421        self.writing.take()
422    }
423
424    #[inline]
425    fn needs_write(&self) -> bool {
426        self.writing.is_some() || !self.write_list.is_empty()
427    }
428
429    #[inline]
430    fn set_current(&mut self, new: Option<Writing>) {
431        self.writing = new;
432    }
433}
434
435impl Writing {
436    #[inline]
437    fn new(c: Cow<'static, [u8]>) -> Writing {
438        Writing { source: c, written: 0 }
439    }
440
441    #[inline]
442    fn advance(&mut self, n: usize) {
443        self.written += n;
444    }
445
446    #[inline]
447    fn remaining_bytes(&self) -> &[u8] {
448        &self.source[self.written..]
449    }
450
451    #[inline]
452    fn finished(&self) -> bool {
453        self.written >= self.source.len()
454    }
455}
456
457struct PeekableReceiver<T> {
458    rx: Receiver<T>,
459    peeked: Option<T>,
460}
461
462impl<T> PeekableReceiver<T> {
463    fn new(rx: Receiver<T>) -> Self {
464        Self { rx, peeked: None }
465    }
466
467    fn peek(&mut self) -> Option<&T> {
468        if self.peeked.is_none() {
469            self.peeked = self.rx.try_recv().ok();
470        }
471
472        self.peeked.as_ref()
473    }
474
475    fn recv(&mut self) -> Option<T> {
476        if self.peeked.is_some() {
477            self.peeked.take()
478        } else {
479            match self.rx.try_recv() {
480                Err(TryRecvError::Disconnected) => panic!("event loop channel closed"),
481                res => res.ok(),
482            }
483        }
484    }
485}