1use std::borrow::Cow;
4use std::collections::VecDeque;
5use std::fmt::{self, Display, Formatter};
6use std::fs::File;
7use std::io::{self, ErrorKind, Read, Write};
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
11use std::thread::JoinHandle;
12use std::time::Instant;
13
14use log::error;
15use polling::{Event as PollingEvent, Events, PollMode};
16
17use crate::event::{self, Event, EventListener, WindowSize};
18use crate::sync::FairMutex;
19use crate::term::Term;
20use crate::{thread, tty};
21use vte::ansi;
22
23pub(crate) const READ_BUFFER_SIZE: usize = 0x10_0000;
25
26const MAX_LOCKED_READ: usize = u16::MAX as usize;
28
29#[derive(Debug)]
31pub enum Msg {
32 Input(Cow<'static, [u8]>),
34
35 Shutdown,
37
38 Resize(WindowSize),
40}
41
42pub struct EventLoop<T: tty::EventedPty, U: EventListener> {
47 poll: Arc<polling::Poller>,
48 pty: T,
49 rx: PeekableReceiver<Msg>,
50 tx: Sender<Msg>,
51 terminal: Arc<FairMutex<Term<U>>>,
52 event_proxy: U,
53 drain_on_exit: bool,
54 ref_test: bool,
55}
56
57impl<T, U> EventLoop<T, U>
58where
59 T: tty::EventedPty + event::OnResize + Send + 'static,
60 U: EventListener + Send + 'static,
61{
62 pub fn new(
64 terminal: Arc<FairMutex<Term<U>>>,
65 event_proxy: U,
66 pty: T,
67 drain_on_exit: bool,
68 ref_test: bool,
69 ) -> io::Result<EventLoop<T, U>> {
70 let (tx, rx) = mpsc::channel();
71 let poll = polling::Poller::new()?.into();
72 Ok(EventLoop {
73 poll,
74 pty,
75 tx,
76 rx: PeekableReceiver::new(rx),
77 terminal,
78 event_proxy,
79 drain_on_exit,
80 ref_test,
81 })
82 }
83
84 pub fn channel(&self) -> EventLoopSender {
85 EventLoopSender { sender: self.tx.clone(), poller: self.poll.clone() }
86 }
87
88 fn drain_recv_channel(&mut self, state: &mut State) -> bool {
92 while let Some(msg) = self.rx.recv() {
93 match msg {
94 Msg::Input(input) => state.write_list.push_back(input),
95 Msg::Resize(window_size) => self.pty.on_resize(window_size),
96 Msg::Shutdown => return false,
97 }
98 }
99
100 true
101 }
102
103 #[inline]
104 fn pty_read<X>(
105 &mut self,
106 state: &mut State,
107 buf: &mut [u8],
108 mut writer: Option<&mut X>,
109 ) -> io::Result<()>
110 where
111 X: Write,
112 {
113 let mut unprocessed = 0;
114 let mut processed = 0;
115
116 let _terminal_lease = Some(self.terminal.lease());
118 let mut terminal = None;
119
120 loop {
121 match self.pty.reader().read(&mut buf[unprocessed..]) {
123 Ok(0) if unprocessed == 0 => break,
125 Ok(got) => unprocessed += got,
126 Err(err) => match err.kind() {
127 ErrorKind::Interrupted | ErrorKind::WouldBlock => {
128 if unprocessed == 0 {
130 break;
131 }
132 },
133 _ => return Err(err),
134 },
135 }
136
137 let terminal = match &mut terminal {
139 Some(terminal) => terminal,
140 None => terminal.insert(match self.terminal.try_lock_unfair() {
141 None if unprocessed >= READ_BUFFER_SIZE => self.terminal.lock_unfair(),
143 None => continue,
144 Some(terminal) => terminal,
145 }),
146 };
147
148 if let Some(writer) = &mut writer {
150 writer.write_all(&buf[..unprocessed]).unwrap();
151 }
152
153 state.parser.advance(&mut **terminal, &buf[..unprocessed]);
155
156 processed += unprocessed;
157 unprocessed = 0;
158
159 if processed >= MAX_LOCKED_READ {
161 break;
162 }
163 }
164
165 if state.parser.sync_bytes_count() < processed && processed > 0 {
167 self.event_proxy.send_event(Event::Wakeup);
168 }
169
170 Ok(())
171 }
172
173 #[inline]
174 fn pty_write(&mut self, state: &mut State) -> io::Result<()> {
175 state.ensure_next();
176
177 'write_many: while let Some(mut current) = state.take_current() {
178 'write_one: loop {
179 match self.pty.writer().write(current.remaining_bytes()) {
180 Ok(0) => {
181 state.set_current(Some(current));
182 break 'write_many;
183 },
184 Ok(n) => {
185 current.advance(n);
186 if current.finished() {
187 state.goto_next();
188 break 'write_one;
189 }
190 },
191 Err(err) => {
192 state.set_current(Some(current));
193 match err.kind() {
194 ErrorKind::Interrupted | ErrorKind::WouldBlock => break 'write_many,
195 _ => return Err(err),
196 }
197 },
198 }
199 }
200 }
201
202 Ok(())
203 }
204
205 pub fn spawn(mut self) -> JoinHandle<(Self, State)> {
206 thread::spawn_named("PTY reader", move || {
207 let mut state = State::default();
208 let mut buf = [0u8; READ_BUFFER_SIZE];
209
210 let poll_opts = PollMode::Level;
211 let mut interest = PollingEvent::readable(0);
212
213 if let Err(err) = unsafe { self.pty.register(&self.poll, interest, poll_opts) } {
215 error!("Event loop registration error: {err}");
216 return (self, state);
217 }
218
219 let mut events = Events::with_capacity(NonZeroUsize::new(1024).unwrap());
220
221 let mut pipe = if self.ref_test {
222 Some(File::create("./alacritty.recording").expect("create alacritty recording"))
223 } else {
224 None
225 };
226
227 'event_loop: loop {
228 let handler = state.parser.sync_timeout();
230 let timeout =
231 handler.sync_timeout().map(|st| st.saturating_duration_since(Instant::now()));
232
233 events.clear();
234 if let Err(err) = self.poll.wait(&mut events, timeout) {
235 match err.kind() {
236 ErrorKind::Interrupted => continue,
237 _ => {
238 error!("Event loop polling error: {err}");
239 break 'event_loop;
240 },
241 }
242 }
243
244 if events.is_empty() && self.rx.peek().is_none() {
246 state.parser.stop_sync(&mut *self.terminal.lock());
247 self.event_proxy.send_event(Event::Wakeup);
248 continue;
249 }
250
251 if !self.drain_recv_channel(&mut state) {
253 break;
254 }
255
256 for event in events.iter() {
257 match event.key {
258 tty::PTY_CHILD_EVENT_TOKEN => {
259 if let Some(tty::ChildEvent::Exited(code)) = self.pty.next_child_event()
260 {
261 if let Some(code) = code {
262 self.event_proxy.send_event(Event::ChildExit(code));
263 }
264 if self.drain_on_exit {
265 let _ = self.pty_read(&mut state, &mut buf, pipe.as_mut());
266 }
267 self.terminal.lock().exit();
268 self.event_proxy.send_event(Event::Wakeup);
269 break 'event_loop;
270 }
271 },
272
273 tty::PTY_READ_WRITE_TOKEN => {
274 if event.is_interrupt() {
275 continue;
277 }
278
279 if event.readable {
280 if let Err(err) = self.pty_read(&mut state, &mut buf, pipe.as_mut())
281 {
282 #[cfg(target_os = "linux")]
288 if err.raw_os_error() == Some(libc::EIO) {
289 continue;
290 }
291
292 error!("Error reading from PTY in event loop: {err}");
293 break 'event_loop;
294 }
295 }
296
297 if event.writable {
298 if let Err(err) = self.pty_write(&mut state) {
299 error!("Error writing to PTY in event loop: {err}");
300 break 'event_loop;
301 }
302 }
303 },
304 _ => (),
305 }
306 }
307
308 let needs_write = state.needs_write();
310 if needs_write != interest.writable {
311 interest.writable = needs_write;
312
313 self.pty.reregister(&self.poll, interest, poll_opts).unwrap();
315 }
316 }
317
318 let _ = self.pty.deregister(&self.poll);
320
321 (self, state)
322 })
323 }
324}
325
326struct Writing {
328 source: Cow<'static, [u8]>,
329 written: usize,
330}
331
332pub struct Notifier(pub EventLoopSender);
333
334impl event::Notify for Notifier {
335 fn notify<B>(&self, bytes: B)
336 where
337 B: Into<Cow<'static, [u8]>>,
338 {
339 let bytes = bytes.into();
340 if bytes.is_empty() {
342 return;
343 }
344
345 let _ = self.0.send(Msg::Input(bytes));
346 }
347}
348
349impl event::OnResize for Notifier {
350 fn on_resize(&mut self, window_size: WindowSize) {
351 let _ = self.0.send(Msg::Resize(window_size));
352 }
353}
354
355#[derive(Debug)]
356pub enum EventLoopSendError {
357 Io(io::Error),
359
360 Send(mpsc::SendError<Msg>),
362}
363
364impl Display for EventLoopSendError {
365 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
366 match self {
367 EventLoopSendError::Io(err) => err.fmt(f),
368 EventLoopSendError::Send(err) => err.fmt(f),
369 }
370 }
371}
372
373impl std::error::Error for EventLoopSendError {
374 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
375 match self {
376 EventLoopSendError::Io(err) => err.source(),
377 EventLoopSendError::Send(err) => err.source(),
378 }
379 }
380}
381
382#[derive(Clone)]
383pub struct EventLoopSender {
384 sender: Sender<Msg>,
385 poller: Arc<polling::Poller>,
386}
387
388impl EventLoopSender {
389 pub fn send(&self, msg: Msg) -> Result<(), EventLoopSendError> {
390 self.sender.send(msg).map_err(EventLoopSendError::Send)?;
391 self.poller.notify().map_err(EventLoopSendError::Io)
392 }
393}
394
395#[derive(Default)]
400pub struct State {
401 write_list: VecDeque<Cow<'static, [u8]>>,
402 writing: Option<Writing>,
403 parser: ansi::Processor,
404}
405
406impl State {
407 #[inline]
408 fn ensure_next(&mut self) {
409 if self.writing.is_none() {
410 self.goto_next();
411 }
412 }
413
414 #[inline]
415 fn goto_next(&mut self) {
416 self.writing = self.write_list.pop_front().map(Writing::new);
417 }
418
419 #[inline]
420 fn take_current(&mut self) -> Option<Writing> {
421 self.writing.take()
422 }
423
424 #[inline]
425 fn needs_write(&self) -> bool {
426 self.writing.is_some() || !self.write_list.is_empty()
427 }
428
429 #[inline]
430 fn set_current(&mut self, new: Option<Writing>) {
431 self.writing = new;
432 }
433}
434
435impl Writing {
436 #[inline]
437 fn new(c: Cow<'static, [u8]>) -> Writing {
438 Writing { source: c, written: 0 }
439 }
440
441 #[inline]
442 fn advance(&mut self, n: usize) {
443 self.written += n;
444 }
445
446 #[inline]
447 fn remaining_bytes(&self) -> &[u8] {
448 &self.source[self.written..]
449 }
450
451 #[inline]
452 fn finished(&self) -> bool {
453 self.written >= self.source.len()
454 }
455}
456
457struct PeekableReceiver<T> {
458 rx: Receiver<T>,
459 peeked: Option<T>,
460}
461
462impl<T> PeekableReceiver<T> {
463 fn new(rx: Receiver<T>) -> Self {
464 Self { rx, peeked: None }
465 }
466
467 fn peek(&mut self) -> Option<&T> {
468 if self.peeked.is_none() {
469 self.peeked = self.rx.try_recv().ok();
470 }
471
472 self.peeked.as_ref()
473 }
474
475 fn recv(&mut self) -> Option<T> {
476 if self.peeked.is_some() {
477 self.peeked.take()
478 } else {
479 match self.rx.try_recv() {
480 Err(TryRecvError::Disconnected) => panic!("event loop channel closed"),
481 res => res.ok(),
482 }
483 }
484 }
485}