Skip to main content

winptyrs/pty/
base.rs

1use windows::core::{Error, HRESULT, PCSTR};
2/// Base struct used to generalize some of the PTY I/O operations.
3use windows::Win32::Foundation::{
4    CloseHandle, ERROR_IO_PENDING, HANDLE, STATUS_PENDING, S_OK, WAIT_FAILED, WAIT_OBJECT_0,
5    WAIT_TIMEOUT,
6};
7use windows::Win32::Globalization::{
8    MultiByteToWideChar, WideCharToMultiByte, CP_UTF8, MULTI_BYTE_TO_WIDE_CHAR_FLAGS,
9};
10use windows::Win32::Storage::FileSystem::{GetFileSizeEx, ReadFile, WriteFile};
11use windows::Win32::System::Pipes::PeekNamedPipe;
12use windows::Win32::System::Threading::{
13    CreateEventExW, SetEvent, WaitForSingleObjectEx, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS,
14    INFINITE,
15};
16use windows::Win32::System::Threading::{GetExitCodeProcess, GetProcessId, WaitForSingleObject};
17use windows::Win32::System::IO::{CancelIoEx, GetOverlappedResult, OVERLAPPED};
18
19use core::ffi::c_void;
20use std::ffi::OsString;
21use std::mem::MaybeUninit;
22use std::ptr;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::mpsc;
25use std::sync::{Arc, Mutex};
26use std::thread;
27
28#[cfg(windows)]
29use std::os::windows::ffi::OsStrExt;
30#[cfg(windows)]
31use std::os::windows::prelude::*;
32#[cfg(unix)]
33use std::vec::IntoIter;
34
35use crossbeam_channel::{unbounded, Sender, Receiver};
36
37use super::PTYArgs;
38
39#[cfg(unix)]
40trait OsStrExt {
41    fn from_wide(x: &[u16]) -> OsString;
42    fn encode_wide(&self) -> IntoIter<u16>;
43}
44
45#[cfg(unix)]
46impl OsStrExt for OsString {
47    fn from_wide(_: &[u16]) -> OsString {
48        return OsString::new();
49    }
50
51    fn encode_wide(&self) -> IntoIter<u16> {
52        return Vec::<u16>::new().into_iter();
53    }
54}
55
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
57pub struct LocalHandle(pub *mut c_void);
58
59impl LocalHandle {
60    pub fn is_invalid(&self) -> bool {
61        self.0 == -1 as _ || self.0 == 0 as _
62    }
63}
64
65unsafe impl Send for LocalHandle {}
66unsafe impl Sync for LocalHandle {}
67
68impl From<HANDLE> for LocalHandle {
69    fn from(value: HANDLE) -> Self {
70        Self(value.0)
71    }
72}
73
74impl From<LocalHandle> for HANDLE {
75    fn from(value: LocalHandle) -> Self {
76        Self(value.0)
77    }
78}
79
80/// This trait should be implemented by any backend that wants to provide a PTY implementation.
81pub trait PTYImpl: Sync + Send {
82    /// Create a new instance of the PTY backend.
83    ///
84    /// # Arguments
85    /// * `args` - Arguments used to initialize the backend struct.
86    ///
87    /// # Returns
88    /// * `pty`: The instantiated PTY struct.
89    #[allow(clippy::new_ret_no_self)]
90    fn new(args: &PTYArgs) -> Result<Box<dyn PTYImpl>, OsString>
91    where
92        Self: Sized;
93
94    /// Spawn a process inside the PTY.
95    ///
96    /// # Arguments
97    /// * `appname` - Full path to the executable binary to spawn.
98    /// * `cmdline` - Optional space-delimited arguments to provide to the executable.
99    /// * `cwd` - Optional path from where the executable should be spawned.
100    /// * `env` - Optional environment variables to provide to the process. Each
101    /// variable should be declared as `VAR=VALUE` and be separated by a NUL (0) character.
102    ///
103    /// # Returns
104    /// `true` if the call was successful, else an error will be returned.
105    fn spawn(
106        &mut self,
107        appname: OsString,
108        cmdline: Option<OsString>,
109        cwd: Option<OsString>,
110        env: Option<OsString>,
111    ) -> Result<bool, OsString>;
112
113    /// Change the PTY size.
114    ///
115    /// # Arguments
116    /// * `cols` - Number of character columns to display.
117    /// * `rows` - Number of line rows to display.
118    fn set_size(&self, cols: i32, rows: i32) -> Result<(), OsString>;
119
120    /// Read from the process standard output.
121    ///
122    /// # Arguments
123    /// * `blocking` - If true, wait for data to be available. If false, return immediately if no data is available.
124    ///
125    /// # Returns
126    /// * `Ok(OsString)` - The data read from the process output
127    /// * `Err(OsString)` - If EOF is reached or an error occurs
128    ///
129    /// # Notes
130    /// * The actual read operation happens in a background thread with a fixed buffer size
131    /// * The returned data is represented using a [`OsString`] since Windows operates over `u16` strings
132    fn read(&self, blocking: bool) -> Result<OsString, OsString>;
133
134    /// Write a (possibly) UTF-16 string into the standard input of a process.
135    ///
136    /// # Arguments
137    /// * `buf` - [`OsString`] containing the string to write.
138    ///
139    /// # Returns
140    /// The total number of characters written if the call was successful, else
141    /// an [`OsString`] containing an human-readable error.
142    fn write(&self, buf: OsString) -> Result<u32, OsString>;
143
144    /// Check if a process reached End-of-File (EOF).
145    ///
146    /// # Returns
147    /// `true` if the process reached EOL, false otherwise. If an error occurs, then a [`OsString`]
148    /// containing a human-readable error is raised.
149    fn is_eof(&self) -> Result<bool, OsString>;
150
151    /// Retrieve the exit status of the process
152    ///
153    /// # Returns
154    /// `None` if the process has not exited, else the exit code of the process.
155    fn get_exitstatus(&self) -> Result<Option<u32>, OsString>;
156
157    /// Determine if the process is still alive.
158    fn is_alive(&self) -> Result<bool, OsString>;
159
160    /// Retrieve the Process ID associated to the current process.
161    fn get_pid(&self) -> u32;
162
163    /// Retrieve the process handle ID of the spawned program.
164    fn get_fd(&self) -> isize;
165
166    /// Wait for the process to exit/finish.
167    fn wait_for_exit(&self) -> Result<bool, OsString>;
168
169    /// Cancel all pending I/O read operations.
170    fn cancel_io(&self) -> Result<bool, OsString>;
171}
172
173fn read(
174    blocking: bool,
175    stream: HANDLE,
176    using_pipes: bool,
177    lp_overlapped: Option<*mut OVERLAPPED>,
178) -> Result<(OsString, bool), OsString> {
179    let mut result: HRESULT;
180    if !blocking {
181        if using_pipes {
182            let mut bytes_u = MaybeUninit::<u32>::uninit();
183
184            unsafe {
185                let bytes_ptr = ptr::addr_of_mut!(*bytes_u.as_mut_ptr());
186                let bytes_ref = bytes_ptr.as_mut().unwrap();
187
188                result = if PeekNamedPipe(stream, None, 0, Some(bytes_ref), None, None).is_ok() {
189                    S_OK
190                } else {
191                    Error::from_thread().into()
192                };
193
194                if result.is_err() {
195                    let result_msg = result.message();
196                    let string = OsString::from(result_msg);
197                    return Err(string);
198                }
199            }
200        } else {
201            let mut size = MaybeUninit::<i64>::uninit();
202            unsafe {
203                let size_ptr = ptr::addr_of_mut!(*size.as_mut_ptr());
204                let size_ref = size_ptr.as_mut().unwrap();
205                result = if GetFileSizeEx(stream, size_ref).is_ok() {
206                    S_OK
207                } else {
208                    Error::from_thread().into()
209                };
210
211                if result.is_err() {
212                    let result_msg = result.message();
213                    let string = OsString::from(result_msg);
214                    return Err(string);
215                }
216                size.assume_init();
217            }
218        }
219    }
220
221    const BUFFER_SIZE: usize = 32768;
222    let mut buf_vec: Vec<u8> = vec![0u8; BUFFER_SIZE];
223    let mut chars_read = MaybeUninit::<u32>::uninit();
224    let mut awaiting_io = false;
225    unsafe {
226        let chars_read_ptr = ptr::addr_of_mut!(*chars_read.as_mut_ptr());
227        let chars_read_mut = Some(chars_read_ptr);
228        result = if ReadFile(
229            stream,
230            Some(&mut buf_vec[..]),
231            chars_read_mut,
232            lp_overlapped,
233        )
234        .is_ok()
235        {
236            S_OK
237        } else {
238            let err = Error::from_thread();
239            if let None = lp_overlapped {
240                Error::from_thread().into()
241            } else if err.code() != ERROR_IO_PENDING.into() {
242                Error::from_thread().into()
243            } else {
244                awaiting_io = true;
245                S_OK
246            }
247        };
248
249        if result.is_err() {
250            let result_msg = result.message();
251            let string = OsString::from(result_msg);
252            return Err(string);
253        }
254
255        if let Some(overlapped) = lp_overlapped {
256            result = if awaiting_io {
257                // awaiting_io = false;
258                if (*overlapped).Internal == STATUS_PENDING.0 as usize {
259                    if WaitForSingleObjectEx((*overlapped).hEvent, INFINITE, false) != WAIT_OBJECT_0
260                    {
261                        Error::from_thread().into()
262                    } else {
263                        *chars_read_ptr = (*overlapped).InternalHigh as u32;
264                        HRESULT((*overlapped).Internal as i32).into()
265                    }
266                } else {
267                    *chars_read_ptr = (*overlapped).InternalHigh as u32;
268                    HRESULT((*overlapped).Internal as i32).into()
269                }
270            } else {
271                S_OK
272            };
273
274            if result.is_err() {
275                let result_msg = result.message();
276                let string = OsString::from(result_msg);
277                return Err(string);
278            }
279
280            let read_bytes = chars_read.assume_init();
281            if read_bytes == 0 {
282                return Ok((OsString::new(), false));
283            }
284        }
285    }
286
287    // if let Some(true) = awaiting_io {
288    //     return Ok((OsString::new(), awaiting_io));
289    // }
290
291    let mut vec_buf: Vec<u16> = vec![0u16; buf_vec.len()];
292
293    unsafe {
294        MultiByteToWideChar(
295            CP_UTF8,
296            MULTI_BYTE_TO_WIDE_CHAR_FLAGS(0),
297            &buf_vec[..],
298            Some(&mut vec_buf[..]),
299        );
300    }
301
302    let non_zeros_init = Vec::new();
303    let non_zeros: Vec<u16> =
304        vec_buf
305            .split(|x| x == &0)
306            .map(|x| x.to_vec())
307            .fold(non_zeros_init, |mut acc, mut x| {
308                acc.append(&mut x);
309                acc
310            });
311    let os_str = OsString::from_wide(&non_zeros[..]);
312    Ok((os_str, true))
313}
314
315fn is_alive(process: HANDLE) -> Result<bool, OsString> {
316    unsafe {
317        let is_timeout = WaitForSingleObject(process, 0);
318        let succ = is_timeout != WAIT_FAILED;
319
320        if succ {
321            let alive = is_timeout == WAIT_TIMEOUT;
322            Ok(alive)
323        } else {
324            let err: HRESULT = Error::from_thread().into();
325            let result_msg = err.message();
326            let string = OsString::from(result_msg);
327            Err(string)
328        }
329    }
330}
331
332fn wait_for_exit(process: HANDLE) -> Result<bool, OsString> {
333    unsafe {
334        let wait_status = WaitForSingleObject(process, INFINITE);
335        let succ = wait_status != WAIT_FAILED;
336        if succ {
337            let dead = wait_status == WAIT_OBJECT_0;
338            Ok(dead)
339        } else {
340            let err: HRESULT = Error::from_thread().into();
341            let result_msg = err.message();
342            let string = OsString::from(result_msg);
343            Err(string)
344        }
345    }
346}
347
348fn get_exitstatus(process: HANDLE) -> Result<Option<u32>, OsString> {
349    let mut exit = MaybeUninit::<u32>::uninit();
350    unsafe {
351        let exit_ptr: *mut u32 = ptr::addr_of_mut!(*exit.as_mut_ptr());
352        let exit_ref = exit_ptr.as_mut().unwrap();
353        let succ = GetExitCodeProcess(process, exit_ref).is_ok();
354
355        if succ {
356            let actual_exit = *exit_ptr;
357            exit.assume_init();
358            let alive = actual_exit == STATUS_PENDING.0 as u32;
359            let mut exitstatus: Option<u32> = None;
360            if !alive {
361                exitstatus = Some(actual_exit);
362            }
363            Ok(exitstatus)
364        } else {
365            let err: HRESULT = Error::from_thread().into();
366            let result_msg = err.message();
367            let string = OsString::from(result_msg);
368            Err(string)
369        }
370    }
371}
372
373fn is_eof(process: HANDLE, stream: HANDLE) -> Result<bool, OsString> {
374    let mut bytes = MaybeUninit::<u32>::uninit();
375    unsafe {
376        let bytes_ptr: *mut u32 = ptr::addr_of_mut!(*bytes.as_mut_ptr());
377        let bytes_ref = Some(bytes_ptr);
378        let succ = PeekNamedPipe(stream, None, 0, None, bytes_ref, None).is_ok();
379
380        let total_bytes = bytes.assume_init();
381        if succ {
382            match is_alive(process) {
383                Ok(alive) => {
384                    let eof = !alive && total_bytes == 0;
385                    Ok(eof)
386                }
387                Err(_) => Ok(true),
388            }
389        } else {
390            Ok(true)
391        }
392    }
393}
394
395/// This struct handles the I/O operations to the standard streams, as well
396/// the lifetime of a process running inside a PTY.
397pub struct PTYProcess {
398    /// Handle to the process to read from.
399    process: LocalHandle,
400    /// Handle to the standard input stream.
401    conin: LocalHandle,
402    /// Handle to the standard output stream.
403    conout: LocalHandle,
404    /// Identifier of the process running inside the PTY.
405    pid: u32,
406    /// Close process when the struct is dropped.
407    close_process: bool,
408    /// Handle to the thread used to read from the standard output.
409    reading_thread: Option<thread::JoinHandle<()>>,
410    /// Handle to the thread used to check if the process is alive.
411    alive_thread: Option<thread::JoinHandle<()>>,
412    /// Channel used to keep the thread alive.
413    reader_alive: Sender<bool>,
414    /// Atomic variable to signal when a thread finishes
415    reader_atomic: Arc<AtomicBool>,
416    /// Manual-reset event signaled by the reader thread on exit; lets teardown wait
417    /// event-driven instead of busy-looping on `reader_atomic`.
418    reader_exit_event: LocalHandle,
419    /// Channel used to send the process handle to the reading thread.
420    reader_process_out: Sender<Option<LocalHandle>>,
421    /// Atomic flag to signal that the reading process has the process handle.
422    reader_ready: Arc<AtomicBool>,
423    /// Channel used to receive a response from the reading thread.
424    reader_out_rx: Receiver<Option<Result<OsString, OsString>>>,
425    /// PTY process is async
426    async_: bool,
427    /// Writing OVERLAPPED struct for async operation
428    write_overlapped: Option<OVERLAPPED>,
429    /// Write mutex for concurrent access under async IO
430    write_mutex: Arc<Mutex<bool>>,
431}
432
433impl PTYProcess {
434    /// Create a new [`PTYProcess`] instance.
435    ///
436    /// # Arguments
437    /// * `conin` - Handle to the process standard input stream
438    /// * `conout` - Handle to the process standard output stream
439    /// * `using_pipes` - `true` if the streams are Windows named pipes, `false` if they are files.
440    /// * `async_` - `true` if the streams are async, `false` if they are sync.
441    ///
442    /// # Returns
443    /// * `pty` - A new [`PTYProcess`] instance.
444    pub fn new(
445        conin: LocalHandle,
446        conout: LocalHandle,
447        using_pipes: bool,
448        async_: bool,
449        cleanup_tx: Option<mpsc::Sender<bool>>,
450    ) -> PTYProcess {
451        let thread_arc = Arc::new(AtomicBool::new(true));
452        let reader_arc = Arc::new(AtomicBool::new(false));
453
454        // Manual-reset event signaled by the reader thread when it exits. Teardown
455        // waits on this (event-driven) instead of busy-polling `thread_arc`.
456        let reader_exit_handle = unsafe {
457            match CreateEventExW(None, None, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS.0) {
458                Ok(evt) => evt,
459                Err(_) => HANDLE::default(),
460            }
461        };
462        let reader_exit_event = LocalHandle::from(reader_exit_handle);
463
464        if !async_ {
465            // Keep only the reading thread channel
466            let (reader_out_tx, reader_out_rx) =
467                unbounded::
468                <Option<Result<OsString, OsString>>>();
469            let (reader_alive_tx, reader_alive_rx) = unbounded::<bool>();
470            let (reader_process_tx, reader_process_rx) = unbounded::<Option<LocalHandle>>();
471            let spinlock_clone = Arc::clone(&thread_arc);
472            let reader_ready = Arc::clone(&reader_arc);
473            let reader_exit_for_thread = reader_exit_event;
474
475            let reader_thread = thread::spawn(move || {
476                let process_result = reader_process_rx.recv();
477                reader_ready.store(true, Ordering::Release);
478                if let Ok(Some(process)) = process_result {
479                    let mut alive = reader_alive_rx
480                        .try_recv()
481                        .unwrap_or(true);
482                    while alive
483                    {
484                        if !is_eof(process.into(), conout.into()).unwrap() {
485                            match read(true, conout.into(), using_pipes, None) {
486                                Ok((result, _)) => {
487                                    reader_out_tx.send(Some(Ok(result))).unwrap();
488                                }
489                                Err(err) => {
490                                    reader_out_tx.send(Some(Err(err))).unwrap();
491                                }
492                            }
493                            alive = reader_alive_rx
494                        .try_recv()
495                        .unwrap_or(true);
496                        } else {
497                            reader_out_tx.send(None).unwrap();
498                            alive = false;
499                        }
500                    }
501                }
502                spinlock_clone.store(false, Ordering::Release);
503
504                unsafe {
505                    let _ = SetEvent(Into::<HANDLE>::into(reader_exit_for_thread));
506                }
507
508                drop(reader_process_rx);
509                drop(reader_alive_rx);
510                drop(reader_out_tx);
511            });
512
513            PTYProcess {
514                process: LocalHandle(std::ptr::null_mut()),
515                conin,
516                conout,
517                pid: 0,
518                close_process: true,
519                reading_thread: Some(reader_thread),
520                alive_thread: None,
521                reader_alive: reader_alive_tx,
522                reader_atomic: thread_arc,
523                reader_exit_event,
524                reader_process_out: reader_process_tx,
525                reader_ready: reader_arc,
526                reader_out_rx,
527                async_,
528                write_overlapped: None,
529                write_mutex: Arc::new(Mutex::new(false)),
530            }
531        } else {
532            let mut write_overlapped = OVERLAPPED::default();
533            unsafe {
534                match CreateEventExW(None, None, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS.0) {
535                    Ok(evt) => {
536                        write_overlapped.hEvent = evt;
537                    }
538
539                    Err(_) => (),
540                }
541            }
542
543            let (reader_out_tx, reader_out_rx) =
544                unbounded::<Option<Result<OsString, OsString>>>();
545            let (reader_alive_tx, reader_alive_rx) = unbounded::<bool>();
546            let (reader_process_tx, reader_process_rx) = unbounded::<Option<LocalHandle>>();
547            let spinlock_clone = Arc::clone(&thread_arc);
548            let reader_ready = Arc::clone(&reader_arc);
549            let (reader_process_2_tx, reader_process_2_rx) = unbounded::<LocalHandle>();
550            let reader_exit_for_thread = reader_exit_event;
551            let reader_exit_for_alive = reader_exit_event;
552
553            let reader_thread = thread::spawn(move || {
554                let mut read_overlapped = OVERLAPPED::default();
555                unsafe {
556                    match CreateEventExW(None, None, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS.0)
557                    {
558                        Ok(evt) => {
559                            read_overlapped.hEvent = evt;
560                        }
561
562                        Err(_) => (),
563                    }
564                }
565
566                let process_result = reader_process_rx.recv();
567                reader_ready.store(true, Ordering::Release);
568                if let Ok(Some(process)) = process_result {
569                    let _ = reader_process_2_tx.send(process);
570                    let mut alive = true;
571                    while alive {
572                        match read(true, conout.into(), using_pipes, Some(&mut read_overlapped)) {
573                            Ok((result, alive_status)) => {
574                                reader_out_tx.send(Some(Ok(result))).unwrap();
575                                alive = alive_status;
576                            }
577                            Err(err) => {
578                                reader_out_tx.send(Some(Err(err))).unwrap();
579                                alive = false;
580                            }
581                        }
582                    }
583
584                    unsafe {
585                        let _ = CloseHandle(read_overlapped.hEvent);
586                    }
587                }
588                spinlock_clone.store(false, Ordering::Release);
589
590                unsafe {
591                    let _ = SetEvent(Into::<HANDLE>::into(reader_exit_for_thread));
592                }
593
594                drop(reader_process_rx);
595                drop(reader_alive_rx);
596                drop(reader_out_tx);
597                drop(reader_process_2_tx);
598            });
599
600            let alive_thread = thread::spawn(move || {
601                if let Ok(handle) = reader_process_2_rx.recv() {
602                    let _ = wait_for_exit(handle.into());
603                    unsafe {
604                        // Child has exited. Let modern ConPTY auto-close the output
605                        // pipe — reader's pending ReadFile then returns 0 bytes
606                        // (natural EOF) and the reader exits, signaling
607                        // `reader_exit_event`. If that doesn't happen in time
608                        // (older ConPTY / non-ConPTY consumer), fall back to
609                        // CancelIoEx to unstick the reader.
610                        let exit_handle = Into::<HANDLE>::into(reader_exit_for_alive);
611                        if WaitForSingleObject(exit_handle, 5000) != WAIT_OBJECT_0 {
612                            let _ = CancelIoEx(Into::<HANDLE>::into(conout), None);
613                            loop {
614                                if WaitForSingleObject(exit_handle, 50) == WAIT_OBJECT_0 {
615                                    break;
616                                }
617                                let _ = CancelIoEx(Into::<HANDLE>::into(conout), None);
618                            }
619                        }
620                        // Reader has drained and exited. Signal cleanup so the
621                        // consumer (e.g. ConPTY's cleanup_thread) can release
622                        // resources via ClosePseudoConsole.
623                        if let Some(tx) = cleanup_tx {
624                            let _ = tx.send(true).unwrap_or(());
625                        }
626                    }
627                }
628                drop(reader_process_2_rx);
629            });
630
631            PTYProcess {
632                process: LocalHandle(std::ptr::null_mut()),
633                conin,
634                conout,
635                pid: 0,
636                close_process: true,
637                reading_thread: Some(reader_thread),
638                alive_thread: Some(alive_thread),
639                reader_alive: reader_alive_tx,
640                reader_atomic: thread_arc,
641                reader_exit_event,
642                reader_process_out: reader_process_tx,
643                reader_ready: reader_arc,
644                reader_out_rx,
645                async_,
646                write_overlapped: Some(write_overlapped),
647                write_mutex: Arc::new(Mutex::new(false)),
648            }
649        }
650    }
651
652    /// Read from the process standard output.
653    ///
654    /// # Arguments
655    /// * `blocking` - If true, wait for data to be available. If false, return immediately if no data is available.
656    ///
657    /// # Returns
658    /// * `Ok(OsString)` - The data read from the process output
659    /// * `Err(OsString)` - If EOF is reached or an error occurs
660    ///
661    /// # Notes
662    /// * The actual read operation happens in a background thread with a fixed buffer size
663    /// * The returned data is represented using a [`OsString`] since Windows operates over `u16` strings
664    pub fn read(&self, blocking: bool) -> Result<OsString, OsString> {
665        // Get data directly from reading thread
666        match blocking {
667            true => match self.reader_out_rx.recv() {
668                Ok(None) => Err(OsString::from("Standard out reached EOF")),
669                Ok(Some(bytes)) => bytes,
670                Err(_) => Ok(OsString::new()),
671            },
672            false => match self.reader_out_rx.try_recv() {
673                Ok(None) => Err(OsString::from("Standard out reached EOF")),
674                Ok(Some(bytes)) => bytes,
675                Err(_) => Ok(OsString::new()),
676            },
677        }
678    }
679
680    /// Write an (possibly) UTF-16 string into the standard input of a process.
681    ///
682    /// # Arguments
683    /// * `buf` - [`OsString`] containing the string to write.
684    ///
685    /// # Returns
686    /// The total number of characters written if the call was successful, else
687    /// an [`OsString`] containing an human-readable error.
688    pub fn write(&self, buf: OsString) -> Result<u32, OsString> {
689        const BUFFER_SIZE: usize = 8192;
690        let vec_buf: Vec<u16> = buf.encode_wide().collect();
691
692        unsafe {
693            let required_size = WideCharToMultiByte(
694                CP_UTF8,
695                0,
696                &vec_buf[..],
697                None,
698                PCSTR(ptr::null_mut::<u8>()),
699                None,
700            );
701
702            let mut bytes_buf: Vec<u8> = std::iter::repeat(0)
703                .take((required_size) as usize)
704                .collect();
705
706            WideCharToMultiByte(
707                CP_UTF8,
708                0,
709                &vec_buf[..],
710                Some(&mut bytes_buf[..]),
711                PCSTR(ptr::null_mut::<u8>()),
712                None,
713            );
714
715            let mut total_written = 0u32;
716            let mut bytes_written = MaybeUninit::<u32>::uninit();
717            let bytes_ptr: *mut u32 = ptr::addr_of_mut!(*bytes_written.as_mut_ptr());
718            let bytes_ref = Some(bytes_ptr);
719
720            let c_mutex = Arc::clone(&self.write_mutex);
721            let mut write_pending = c_mutex.lock().unwrap();
722
723            // Write in chunks
724            for chunk in bytes_buf.chunks(BUFFER_SIZE) {
725                if self.async_ {
726                    if *write_pending {
727                        *write_pending = false;
728                        if GetOverlappedResult(
729                            Into::<HANDLE>::into(self.conin),
730                            &mut self.write_overlapped.unwrap(),
731                            bytes_ptr,
732                            true,
733                        )
734                        .is_err()
735                        {
736                            let err: HRESULT = Error::from_thread().into();
737                            let result_msg = err.message();
738                            let string = OsString::from(result_msg);
739                            return Err(string);
740                        } else {
741                            total_written += bytes_written.assume_init();
742                        }
743                    }
744
745                    let write_result = if WriteFile(
746                        Into::<HANDLE>::into(self.conin),
747                        Some(chunk),
748                        bytes_ref,
749                        Some(&mut self.write_overlapped.unwrap()),
750                    )
751                    .is_ok()
752                    {
753                        S_OK
754                    } else {
755                        let err = Error::from_thread();
756                        if err.code() == ERROR_IO_PENDING.into() {
757                            *write_pending = true;
758                            S_OK
759                        } else {
760                            Error::from_thread().into()
761                        }
762                    };
763
764                    if write_result.is_err() {
765                        let result_msg = write_result.message();
766                        let string = OsString::from(result_msg);
767                        return Err(string);
768                    }
769                } else {
770                    let write_result = if WriteFile(
771                        Into::<HANDLE>::into(self.conin),
772                        Some(chunk),
773                        bytes_ref,
774                        None,
775                    )
776                    .is_ok()
777                    {
778                        S_OK
779                    } else {
780                        Error::from_thread().into()
781                    };
782                    if write_result.is_err() {
783                        let result_msg = write_result.message();
784                        let string = OsString::from(result_msg);
785                        return Err(string);
786                    }
787                    total_written += bytes_written.assume_init();
788                }
789            }
790            Ok(total_written)
791        }
792    }
793
794    /// Check if a process reached End-of-File (EOF).
795    ///
796    /// # Returns
797    /// `true` if the process reached EOL, false otherwise. If an error occurs, then a [`OsString`]
798    /// containing a human-readable error is raised.
799    pub fn is_eof(&self) -> Result<bool, OsString> {
800        // let mut available_bytes: Box<u32> = Box::new_uninit();
801        // let bytes_ptr: *mut u32 = &mut *available_bytes;
802        // let bytes_ptr: *mut u32 = ptr::null_mut();
803        let mut bytes = MaybeUninit::<u32>::uninit();
804        unsafe {
805            let bytes_ptr: *mut u32 = ptr::addr_of_mut!(*bytes.as_mut_ptr());
806            let bytes_ref = Some(bytes_ptr);
807            let mut succ = PeekNamedPipe(
808                Into::<HANDLE>::into(self.conout),
809                None,
810                0,
811                bytes_ref,
812                None,
813                None,
814            )
815            .is_ok();
816
817            let _total_bytes = bytes.assume_init();
818
819            let is_alive = match self.is_alive() {
820                Ok(alive) => {
821                    alive || !self.reader_out_rx.is_empty()
822                },
823                Err(err) => {
824                    return Err(err);
825                }
826            };
827
828            succ = succ || is_alive || self.reader_atomic.load(Ordering::Acquire);
829            Ok(!succ)
830        }
831    }
832
833    /// Retrieve the exit status of the process
834    ///
835    /// # Returns
836    /// `None` if the process has not exited, else the exit code of the process.
837    pub fn get_exitstatus(&self) -> Result<Option<u32>, OsString> {
838        if self.pid == 0 {
839            return Ok(None);
840        }
841
842        match get_exitstatus(self.process.into()) {
843            Ok(exitstatus) => Ok(exitstatus),
844            Err(err) => Err(err),
845        }
846    }
847
848    /// Determine if the process is still alive.
849    pub fn is_alive(&self) -> Result<bool, OsString> {
850        // let mut exit_code: Box<u32> = Box::new_uninit();
851        // let exit_ptr: *mut u32 = &mut *exit_code;
852        match is_alive(self.process.into()) {
853            Ok(alive) => Ok(alive),
854            Err(err) => Err(err),
855        }
856    }
857
858    /// Set the running process behind the PTY.
859    pub fn set_process(&mut self, process: HANDLE, close_process: bool) {
860        self.process = process.into();
861        self.close_process = close_process;
862
863        // if env::var_os("CONPTY_CI").is_some() {
864        //     // For some reason, the CI requires a flush of the handle before
865        //     // reading from a thread.
866        //     let result = read(4096, true, self.conout, false).unwrap();
867        //     println!("{:?}", result);
868        //     let result = read(4096, true, self.conout, false).unwrap();
869        //     println!("{:?}", result);
870        //     let res: Result<u32, OsString> = self.write(OsString::from("\r\n\r\n"));
871        //     res.unwrap();
872        // }
873
874        self.reader_process_out.send(Some(process.into())).unwrap();
875        unsafe {
876            self.pid = GetProcessId(Into::<HANDLE>::into(self.process));
877        }
878    }
879
880    /// Retrieve the Process ID associated to the current process.
881    pub fn get_pid(&self) -> u32 {
882        self.pid
883    }
884
885    /// Retrieve the process handle ID of the spawned program.
886    pub fn get_fd(&self) -> isize {
887        self.process.0 as isize
888    }
889
890    /// Wait for the process to exit
891    pub fn wait_for_exit(&self) -> Result<bool, OsString> {
892        wait_for_exit(self.process.into())
893    }
894
895    /// Cancel all pending I/O operations
896    pub fn cancel_io(&self) -> Result<bool, OsString> {
897        unsafe {
898            if CancelIoEx(Into::<HANDLE>::into(self.conout), None).is_ok() {
899                Ok(true)
900            } else {
901                let result: HRESULT = Error::from_thread().into();
902                let result_msg = result.message();
903                let string = OsString::from(result_msg);
904                Err(string)
905            }
906        }
907    }
908}
909
910impl Drop for PTYProcess {
911    fn drop(&mut self) {
912        unsafe {
913            while !self.reader_ready.load(Ordering::Acquire) {
914                // Unblock thread if it is waiting for a process handle.
915                if self.reader_process_out.send(None).is_ok() {}
916            }
917
918            // Signal the sync reader (which polls the channel) that it should exit,
919            // and cancel any pending I/O so a blocked ReadFile returns. Then wait
920            // event-driven for the reader to actually exit. Retry on timeout to
921            // handle the race where the reader issues a new read between cancel
922            // and wait.
923            let _ = self.reader_alive.send(false);
924            let _ = CancelIoEx(Into::<HANDLE>::into(self.conout), None);
925
926            let exit_handle = Into::<HANDLE>::into(self.reader_exit_event);
927            if !self.reader_exit_event.is_invalid() {
928                loop {
929                    if WaitForSingleObject(exit_handle, 50) == WAIT_OBJECT_0 {
930                        break;
931                    }
932                    let _ = self.reader_alive.send(false);
933                    let _ = CancelIoEx(Into::<HANDLE>::into(self.conout), None);
934                }
935            }
936
937            // Wait for the thread to be down
938            if let Some(thread_handle) = self.reading_thread.take() {
939                thread_handle.join().unwrap();
940            }
941
942            if !self.conin.is_invalid() {
943                let _ = CloseHandle(Into::<HANDLE>::into(self.conin));
944            }
945
946            if !self.conout.is_invalid() && !self.async_ {
947                let _ = CloseHandle(Into::<HANDLE>::into(self.conout));
948            }
949
950            if self.close_process && !self.process.is_invalid() {
951                let _ = CloseHandle(Into::<HANDLE>::into(self.process));
952            }
953
954            if let Some(thread_handle) = self.alive_thread.take() {
955                thread_handle.join().unwrap_or(());
956            }
957
958            if !self.reader_exit_event.is_invalid() {
959                let _ = CloseHandle(exit_handle);
960            }
961        }
962    }
963}