1use windows::core::{Error, HRESULT, PCSTR};
2use windows::Win32::Foundation::{
4 CloseHandle, ERROR_IO_PENDING, HANDLE, STATUS_PENDING, S_OK, WAIT_FAILED, WAIT_OBJECT_0,
5 WAIT_TIMEOUT,
6};
7use windows::Win32::Globalization::{
8 MultiByteToWideChar, WideCharToMultiByte, CP_UTF8, MULTI_BYTE_TO_WIDE_CHAR_FLAGS,
9};
10use windows::Win32::Storage::FileSystem::{GetFileSizeEx, ReadFile, WriteFile};
11use windows::Win32::System::Pipes::PeekNamedPipe;
12use windows::Win32::System::Threading::{
13 CreateEventExW, SetEvent, WaitForSingleObjectEx, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS,
14 INFINITE,
15};
16use windows::Win32::System::Threading::{GetExitCodeProcess, GetProcessId, WaitForSingleObject};
17use windows::Win32::System::IO::{CancelIoEx, GetOverlappedResult, OVERLAPPED};
18
19use core::ffi::c_void;
20use std::ffi::OsString;
21use std::mem::MaybeUninit;
22use std::ptr;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::mpsc;
25use std::sync::{Arc, Mutex};
26use std::thread;
27
28#[cfg(windows)]
29use std::os::windows::ffi::OsStrExt;
30#[cfg(windows)]
31use std::os::windows::prelude::*;
32#[cfg(unix)]
33use std::vec::IntoIter;
34
35use crossbeam_channel::{unbounded, Sender, Receiver};
36
37use super::PTYArgs;
38
39#[cfg(unix)]
40trait OsStrExt {
41 fn from_wide(x: &[u16]) -> OsString;
42 fn encode_wide(&self) -> IntoIter<u16>;
43}
44
45#[cfg(unix)]
46impl OsStrExt for OsString {
47 fn from_wide(_: &[u16]) -> OsString {
48 return OsString::new();
49 }
50
51 fn encode_wide(&self) -> IntoIter<u16> {
52 return Vec::<u16>::new().into_iter();
53 }
54}
55
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
57pub struct LocalHandle(pub *mut c_void);
58
59impl LocalHandle {
60 pub fn is_invalid(&self) -> bool {
61 self.0 == -1 as _ || self.0 == 0 as _
62 }
63}
64
65unsafe impl Send for LocalHandle {}
66unsafe impl Sync for LocalHandle {}
67
68impl From<HANDLE> for LocalHandle {
69 fn from(value: HANDLE) -> Self {
70 Self(value.0)
71 }
72}
73
74impl From<LocalHandle> for HANDLE {
75 fn from(value: LocalHandle) -> Self {
76 Self(value.0)
77 }
78}
79
80pub trait PTYImpl: Sync + Send {
82 #[allow(clippy::new_ret_no_self)]
90 fn new(args: &PTYArgs) -> Result<Box<dyn PTYImpl>, OsString>
91 where
92 Self: Sized;
93
94 fn spawn(
106 &mut self,
107 appname: OsString,
108 cmdline: Option<OsString>,
109 cwd: Option<OsString>,
110 env: Option<OsString>,
111 ) -> Result<bool, OsString>;
112
113 fn set_size(&self, cols: i32, rows: i32) -> Result<(), OsString>;
119
120 fn read(&self, blocking: bool) -> Result<OsString, OsString>;
133
134 fn write(&self, buf: OsString) -> Result<u32, OsString>;
143
144 fn is_eof(&self) -> Result<bool, OsString>;
150
151 fn get_exitstatus(&self) -> Result<Option<u32>, OsString>;
156
157 fn is_alive(&self) -> Result<bool, OsString>;
159
160 fn get_pid(&self) -> u32;
162
163 fn get_fd(&self) -> isize;
165
166 fn wait_for_exit(&self) -> Result<bool, OsString>;
168
169 fn cancel_io(&self) -> Result<bool, OsString>;
171}
172
173fn read(
174 blocking: bool,
175 stream: HANDLE,
176 using_pipes: bool,
177 lp_overlapped: Option<*mut OVERLAPPED>,
178) -> Result<(OsString, bool), OsString> {
179 let mut result: HRESULT;
180 if !blocking {
181 if using_pipes {
182 let mut bytes_u = MaybeUninit::<u32>::uninit();
183
184 unsafe {
185 let bytes_ptr = ptr::addr_of_mut!(*bytes_u.as_mut_ptr());
186 let bytes_ref = bytes_ptr.as_mut().unwrap();
187
188 result = if PeekNamedPipe(stream, None, 0, Some(bytes_ref), None, None).is_ok() {
189 S_OK
190 } else {
191 Error::from_thread().into()
192 };
193
194 if result.is_err() {
195 let result_msg = result.message();
196 let string = OsString::from(result_msg);
197 return Err(string);
198 }
199 }
200 } else {
201 let mut size = MaybeUninit::<i64>::uninit();
202 unsafe {
203 let size_ptr = ptr::addr_of_mut!(*size.as_mut_ptr());
204 let size_ref = size_ptr.as_mut().unwrap();
205 result = if GetFileSizeEx(stream, size_ref).is_ok() {
206 S_OK
207 } else {
208 Error::from_thread().into()
209 };
210
211 if result.is_err() {
212 let result_msg = result.message();
213 let string = OsString::from(result_msg);
214 return Err(string);
215 }
216 size.assume_init();
217 }
218 }
219 }
220
221 const BUFFER_SIZE: usize = 32768;
222 let mut buf_vec: Vec<u8> = vec![0u8; BUFFER_SIZE];
223 let mut chars_read = MaybeUninit::<u32>::uninit();
224 let mut awaiting_io = false;
225 unsafe {
226 let chars_read_ptr = ptr::addr_of_mut!(*chars_read.as_mut_ptr());
227 let chars_read_mut = Some(chars_read_ptr);
228 result = if ReadFile(
229 stream,
230 Some(&mut buf_vec[..]),
231 chars_read_mut,
232 lp_overlapped,
233 )
234 .is_ok()
235 {
236 S_OK
237 } else {
238 let err = Error::from_thread();
239 if let None = lp_overlapped {
240 Error::from_thread().into()
241 } else if err.code() != ERROR_IO_PENDING.into() {
242 Error::from_thread().into()
243 } else {
244 awaiting_io = true;
245 S_OK
246 }
247 };
248
249 if result.is_err() {
250 let result_msg = result.message();
251 let string = OsString::from(result_msg);
252 return Err(string);
253 }
254
255 if let Some(overlapped) = lp_overlapped {
256 result = if awaiting_io {
257 if (*overlapped).Internal == STATUS_PENDING.0 as usize {
259 if WaitForSingleObjectEx((*overlapped).hEvent, INFINITE, false) != WAIT_OBJECT_0
260 {
261 Error::from_thread().into()
262 } else {
263 *chars_read_ptr = (*overlapped).InternalHigh as u32;
264 HRESULT((*overlapped).Internal as i32).into()
265 }
266 } else {
267 *chars_read_ptr = (*overlapped).InternalHigh as u32;
268 HRESULT((*overlapped).Internal as i32).into()
269 }
270 } else {
271 S_OK
272 };
273
274 if result.is_err() {
275 let result_msg = result.message();
276 let string = OsString::from(result_msg);
277 return Err(string);
278 }
279
280 let read_bytes = chars_read.assume_init();
281 if read_bytes == 0 {
282 return Ok((OsString::new(), false));
283 }
284 }
285 }
286
287 let mut vec_buf: Vec<u16> = vec![0u16; buf_vec.len()];
292
293 unsafe {
294 MultiByteToWideChar(
295 CP_UTF8,
296 MULTI_BYTE_TO_WIDE_CHAR_FLAGS(0),
297 &buf_vec[..],
298 Some(&mut vec_buf[..]),
299 );
300 }
301
302 let non_zeros_init = Vec::new();
303 let non_zeros: Vec<u16> =
304 vec_buf
305 .split(|x| x == &0)
306 .map(|x| x.to_vec())
307 .fold(non_zeros_init, |mut acc, mut x| {
308 acc.append(&mut x);
309 acc
310 });
311 let os_str = OsString::from_wide(&non_zeros[..]);
312 Ok((os_str, true))
313}
314
315fn is_alive(process: HANDLE) -> Result<bool, OsString> {
316 unsafe {
317 let is_timeout = WaitForSingleObject(process, 0);
318 let succ = is_timeout != WAIT_FAILED;
319
320 if succ {
321 let alive = is_timeout == WAIT_TIMEOUT;
322 Ok(alive)
323 } else {
324 let err: HRESULT = Error::from_thread().into();
325 let result_msg = err.message();
326 let string = OsString::from(result_msg);
327 Err(string)
328 }
329 }
330}
331
332fn wait_for_exit(process: HANDLE) -> Result<bool, OsString> {
333 unsafe {
334 let wait_status = WaitForSingleObject(process, INFINITE);
335 let succ = wait_status != WAIT_FAILED;
336 if succ {
337 let dead = wait_status == WAIT_OBJECT_0;
338 Ok(dead)
339 } else {
340 let err: HRESULT = Error::from_thread().into();
341 let result_msg = err.message();
342 let string = OsString::from(result_msg);
343 Err(string)
344 }
345 }
346}
347
348fn get_exitstatus(process: HANDLE) -> Result<Option<u32>, OsString> {
349 let mut exit = MaybeUninit::<u32>::uninit();
350 unsafe {
351 let exit_ptr: *mut u32 = ptr::addr_of_mut!(*exit.as_mut_ptr());
352 let exit_ref = exit_ptr.as_mut().unwrap();
353 let succ = GetExitCodeProcess(process, exit_ref).is_ok();
354
355 if succ {
356 let actual_exit = *exit_ptr;
357 exit.assume_init();
358 let alive = actual_exit == STATUS_PENDING.0 as u32;
359 let mut exitstatus: Option<u32> = None;
360 if !alive {
361 exitstatus = Some(actual_exit);
362 }
363 Ok(exitstatus)
364 } else {
365 let err: HRESULT = Error::from_thread().into();
366 let result_msg = err.message();
367 let string = OsString::from(result_msg);
368 Err(string)
369 }
370 }
371}
372
373fn is_eof(process: HANDLE, stream: HANDLE) -> Result<bool, OsString> {
374 let mut bytes = MaybeUninit::<u32>::uninit();
375 unsafe {
376 let bytes_ptr: *mut u32 = ptr::addr_of_mut!(*bytes.as_mut_ptr());
377 let bytes_ref = Some(bytes_ptr);
378 let succ = PeekNamedPipe(stream, None, 0, None, bytes_ref, None).is_ok();
379
380 let total_bytes = bytes.assume_init();
381 if succ {
382 match is_alive(process) {
383 Ok(alive) => {
384 let eof = !alive && total_bytes == 0;
385 Ok(eof)
386 }
387 Err(_) => Ok(true),
388 }
389 } else {
390 Ok(true)
391 }
392 }
393}
394
395pub struct PTYProcess {
398 process: LocalHandle,
400 conin: LocalHandle,
402 conout: LocalHandle,
404 pid: u32,
406 close_process: bool,
408 reading_thread: Option<thread::JoinHandle<()>>,
410 alive_thread: Option<thread::JoinHandle<()>>,
412 reader_alive: Sender<bool>,
414 reader_atomic: Arc<AtomicBool>,
416 reader_exit_event: LocalHandle,
419 reader_process_out: Sender<Option<LocalHandle>>,
421 reader_ready: Arc<AtomicBool>,
423 reader_out_rx: Receiver<Option<Result<OsString, OsString>>>,
425 async_: bool,
427 write_overlapped: Option<OVERLAPPED>,
429 write_mutex: Arc<Mutex<bool>>,
431}
432
433impl PTYProcess {
434 pub fn new(
445 conin: LocalHandle,
446 conout: LocalHandle,
447 using_pipes: bool,
448 async_: bool,
449 cleanup_tx: Option<mpsc::Sender<bool>>,
450 ) -> PTYProcess {
451 let thread_arc = Arc::new(AtomicBool::new(true));
452 let reader_arc = Arc::new(AtomicBool::new(false));
453
454 let reader_exit_handle = unsafe {
457 match CreateEventExW(None, None, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS.0) {
458 Ok(evt) => evt,
459 Err(_) => HANDLE::default(),
460 }
461 };
462 let reader_exit_event = LocalHandle::from(reader_exit_handle);
463
464 if !async_ {
465 let (reader_out_tx, reader_out_rx) =
467 unbounded::
468 <Option<Result<OsString, OsString>>>();
469 let (reader_alive_tx, reader_alive_rx) = unbounded::<bool>();
470 let (reader_process_tx, reader_process_rx) = unbounded::<Option<LocalHandle>>();
471 let spinlock_clone = Arc::clone(&thread_arc);
472 let reader_ready = Arc::clone(&reader_arc);
473 let reader_exit_for_thread = reader_exit_event;
474
475 let reader_thread = thread::spawn(move || {
476 let process_result = reader_process_rx.recv();
477 reader_ready.store(true, Ordering::Release);
478 if let Ok(Some(process)) = process_result {
479 let mut alive = reader_alive_rx
480 .try_recv()
481 .unwrap_or(true);
482 while alive
483 {
484 if !is_eof(process.into(), conout.into()).unwrap() {
485 match read(true, conout.into(), using_pipes, None) {
486 Ok((result, _)) => {
487 reader_out_tx.send(Some(Ok(result))).unwrap();
488 }
489 Err(err) => {
490 reader_out_tx.send(Some(Err(err))).unwrap();
491 }
492 }
493 alive = reader_alive_rx
494 .try_recv()
495 .unwrap_or(true);
496 } else {
497 reader_out_tx.send(None).unwrap();
498 alive = false;
499 }
500 }
501 }
502 spinlock_clone.store(false, Ordering::Release);
503
504 unsafe {
505 let _ = SetEvent(Into::<HANDLE>::into(reader_exit_for_thread));
506 }
507
508 drop(reader_process_rx);
509 drop(reader_alive_rx);
510 drop(reader_out_tx);
511 });
512
513 PTYProcess {
514 process: LocalHandle(std::ptr::null_mut()),
515 conin,
516 conout,
517 pid: 0,
518 close_process: true,
519 reading_thread: Some(reader_thread),
520 alive_thread: None,
521 reader_alive: reader_alive_tx,
522 reader_atomic: thread_arc,
523 reader_exit_event,
524 reader_process_out: reader_process_tx,
525 reader_ready: reader_arc,
526 reader_out_rx,
527 async_,
528 write_overlapped: None,
529 write_mutex: Arc::new(Mutex::new(false)),
530 }
531 } else {
532 let mut write_overlapped = OVERLAPPED::default();
533 unsafe {
534 match CreateEventExW(None, None, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS.0) {
535 Ok(evt) => {
536 write_overlapped.hEvent = evt;
537 }
538
539 Err(_) => (),
540 }
541 }
542
543 let (reader_out_tx, reader_out_rx) =
544 unbounded::<Option<Result<OsString, OsString>>>();
545 let (reader_alive_tx, reader_alive_rx) = unbounded::<bool>();
546 let (reader_process_tx, reader_process_rx) = unbounded::<Option<LocalHandle>>();
547 let spinlock_clone = Arc::clone(&thread_arc);
548 let reader_ready = Arc::clone(&reader_arc);
549 let (reader_process_2_tx, reader_process_2_rx) = unbounded::<LocalHandle>();
550 let reader_exit_for_thread = reader_exit_event;
551 let reader_exit_for_alive = reader_exit_event;
552
553 let reader_thread = thread::spawn(move || {
554 let mut read_overlapped = OVERLAPPED::default();
555 unsafe {
556 match CreateEventExW(None, None, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS.0)
557 {
558 Ok(evt) => {
559 read_overlapped.hEvent = evt;
560 }
561
562 Err(_) => (),
563 }
564 }
565
566 let process_result = reader_process_rx.recv();
567 reader_ready.store(true, Ordering::Release);
568 if let Ok(Some(process)) = process_result {
569 let _ = reader_process_2_tx.send(process);
570 let mut alive = true;
571 while alive {
572 match read(true, conout.into(), using_pipes, Some(&mut read_overlapped)) {
573 Ok((result, alive_status)) => {
574 reader_out_tx.send(Some(Ok(result))).unwrap();
575 alive = alive_status;
576 }
577 Err(err) => {
578 reader_out_tx.send(Some(Err(err))).unwrap();
579 alive = false;
580 }
581 }
582 }
583
584 unsafe {
585 let _ = CloseHandle(read_overlapped.hEvent);
586 }
587 }
588 spinlock_clone.store(false, Ordering::Release);
589
590 unsafe {
591 let _ = SetEvent(Into::<HANDLE>::into(reader_exit_for_thread));
592 }
593
594 drop(reader_process_rx);
595 drop(reader_alive_rx);
596 drop(reader_out_tx);
597 drop(reader_process_2_tx);
598 });
599
600 let alive_thread = thread::spawn(move || {
601 if let Ok(handle) = reader_process_2_rx.recv() {
602 let _ = wait_for_exit(handle.into());
603 unsafe {
604 let exit_handle = Into::<HANDLE>::into(reader_exit_for_alive);
611 if WaitForSingleObject(exit_handle, 5000) != WAIT_OBJECT_0 {
612 let _ = CancelIoEx(Into::<HANDLE>::into(conout), None);
613 loop {
614 if WaitForSingleObject(exit_handle, 50) == WAIT_OBJECT_0 {
615 break;
616 }
617 let _ = CancelIoEx(Into::<HANDLE>::into(conout), None);
618 }
619 }
620 if let Some(tx) = cleanup_tx {
624 let _ = tx.send(true).unwrap_or(());
625 }
626 }
627 }
628 drop(reader_process_2_rx);
629 });
630
631 PTYProcess {
632 process: LocalHandle(std::ptr::null_mut()),
633 conin,
634 conout,
635 pid: 0,
636 close_process: true,
637 reading_thread: Some(reader_thread),
638 alive_thread: Some(alive_thread),
639 reader_alive: reader_alive_tx,
640 reader_atomic: thread_arc,
641 reader_exit_event,
642 reader_process_out: reader_process_tx,
643 reader_ready: reader_arc,
644 reader_out_rx,
645 async_,
646 write_overlapped: Some(write_overlapped),
647 write_mutex: Arc::new(Mutex::new(false)),
648 }
649 }
650 }
651
652 pub fn read(&self, blocking: bool) -> Result<OsString, OsString> {
665 match blocking {
667 true => match self.reader_out_rx.recv() {
668 Ok(None) => Err(OsString::from("Standard out reached EOF")),
669 Ok(Some(bytes)) => bytes,
670 Err(_) => Ok(OsString::new()),
671 },
672 false => match self.reader_out_rx.try_recv() {
673 Ok(None) => Err(OsString::from("Standard out reached EOF")),
674 Ok(Some(bytes)) => bytes,
675 Err(_) => Ok(OsString::new()),
676 },
677 }
678 }
679
680 pub fn write(&self, buf: OsString) -> Result<u32, OsString> {
689 const BUFFER_SIZE: usize = 8192;
690 let vec_buf: Vec<u16> = buf.encode_wide().collect();
691
692 unsafe {
693 let required_size = WideCharToMultiByte(
694 CP_UTF8,
695 0,
696 &vec_buf[..],
697 None,
698 PCSTR(ptr::null_mut::<u8>()),
699 None,
700 );
701
702 let mut bytes_buf: Vec<u8> = std::iter::repeat(0)
703 .take((required_size) as usize)
704 .collect();
705
706 WideCharToMultiByte(
707 CP_UTF8,
708 0,
709 &vec_buf[..],
710 Some(&mut bytes_buf[..]),
711 PCSTR(ptr::null_mut::<u8>()),
712 None,
713 );
714
715 let mut total_written = 0u32;
716 let mut bytes_written = MaybeUninit::<u32>::uninit();
717 let bytes_ptr: *mut u32 = ptr::addr_of_mut!(*bytes_written.as_mut_ptr());
718 let bytes_ref = Some(bytes_ptr);
719
720 let c_mutex = Arc::clone(&self.write_mutex);
721 let mut write_pending = c_mutex.lock().unwrap();
722
723 for chunk in bytes_buf.chunks(BUFFER_SIZE) {
725 if self.async_ {
726 if *write_pending {
727 *write_pending = false;
728 if GetOverlappedResult(
729 Into::<HANDLE>::into(self.conin),
730 &mut self.write_overlapped.unwrap(),
731 bytes_ptr,
732 true,
733 )
734 .is_err()
735 {
736 let err: HRESULT = Error::from_thread().into();
737 let result_msg = err.message();
738 let string = OsString::from(result_msg);
739 return Err(string);
740 } else {
741 total_written += bytes_written.assume_init();
742 }
743 }
744
745 let write_result = if WriteFile(
746 Into::<HANDLE>::into(self.conin),
747 Some(chunk),
748 bytes_ref,
749 Some(&mut self.write_overlapped.unwrap()),
750 )
751 .is_ok()
752 {
753 S_OK
754 } else {
755 let err = Error::from_thread();
756 if err.code() == ERROR_IO_PENDING.into() {
757 *write_pending = true;
758 S_OK
759 } else {
760 Error::from_thread().into()
761 }
762 };
763
764 if write_result.is_err() {
765 let result_msg = write_result.message();
766 let string = OsString::from(result_msg);
767 return Err(string);
768 }
769 } else {
770 let write_result = if WriteFile(
771 Into::<HANDLE>::into(self.conin),
772 Some(chunk),
773 bytes_ref,
774 None,
775 )
776 .is_ok()
777 {
778 S_OK
779 } else {
780 Error::from_thread().into()
781 };
782 if write_result.is_err() {
783 let result_msg = write_result.message();
784 let string = OsString::from(result_msg);
785 return Err(string);
786 }
787 total_written += bytes_written.assume_init();
788 }
789 }
790 Ok(total_written)
791 }
792 }
793
794 pub fn is_eof(&self) -> Result<bool, OsString> {
800 let mut bytes = MaybeUninit::<u32>::uninit();
804 unsafe {
805 let bytes_ptr: *mut u32 = ptr::addr_of_mut!(*bytes.as_mut_ptr());
806 let bytes_ref = Some(bytes_ptr);
807 let mut succ = PeekNamedPipe(
808 Into::<HANDLE>::into(self.conout),
809 None,
810 0,
811 bytes_ref,
812 None,
813 None,
814 )
815 .is_ok();
816
817 let _total_bytes = bytes.assume_init();
818
819 let is_alive = match self.is_alive() {
820 Ok(alive) => {
821 alive || !self.reader_out_rx.is_empty()
822 },
823 Err(err) => {
824 return Err(err);
825 }
826 };
827
828 succ = succ || is_alive || self.reader_atomic.load(Ordering::Acquire);
829 Ok(!succ)
830 }
831 }
832
833 pub fn get_exitstatus(&self) -> Result<Option<u32>, OsString> {
838 if self.pid == 0 {
839 return Ok(None);
840 }
841
842 match get_exitstatus(self.process.into()) {
843 Ok(exitstatus) => Ok(exitstatus),
844 Err(err) => Err(err),
845 }
846 }
847
848 pub fn is_alive(&self) -> Result<bool, OsString> {
850 match is_alive(self.process.into()) {
853 Ok(alive) => Ok(alive),
854 Err(err) => Err(err),
855 }
856 }
857
858 pub fn set_process(&mut self, process: HANDLE, close_process: bool) {
860 self.process = process.into();
861 self.close_process = close_process;
862
863 self.reader_process_out.send(Some(process.into())).unwrap();
875 unsafe {
876 self.pid = GetProcessId(Into::<HANDLE>::into(self.process));
877 }
878 }
879
880 pub fn get_pid(&self) -> u32 {
882 self.pid
883 }
884
885 pub fn get_fd(&self) -> isize {
887 self.process.0 as isize
888 }
889
890 pub fn wait_for_exit(&self) -> Result<bool, OsString> {
892 wait_for_exit(self.process.into())
893 }
894
895 pub fn cancel_io(&self) -> Result<bool, OsString> {
897 unsafe {
898 if CancelIoEx(Into::<HANDLE>::into(self.conout), None).is_ok() {
899 Ok(true)
900 } else {
901 let result: HRESULT = Error::from_thread().into();
902 let result_msg = result.message();
903 let string = OsString::from(result_msg);
904 Err(string)
905 }
906 }
907 }
908}
909
910impl Drop for PTYProcess {
911 fn drop(&mut self) {
912 unsafe {
913 while !self.reader_ready.load(Ordering::Acquire) {
914 if self.reader_process_out.send(None).is_ok() {}
916 }
917
918 let _ = self.reader_alive.send(false);
924 let _ = CancelIoEx(Into::<HANDLE>::into(self.conout), None);
925
926 let exit_handle = Into::<HANDLE>::into(self.reader_exit_event);
927 if !self.reader_exit_event.is_invalid() {
928 loop {
929 if WaitForSingleObject(exit_handle, 50) == WAIT_OBJECT_0 {
930 break;
931 }
932 let _ = self.reader_alive.send(false);
933 let _ = CancelIoEx(Into::<HANDLE>::into(self.conout), None);
934 }
935 }
936
937 if let Some(thread_handle) = self.reading_thread.take() {
939 thread_handle.join().unwrap();
940 }
941
942 if !self.conin.is_invalid() {
943 let _ = CloseHandle(Into::<HANDLE>::into(self.conin));
944 }
945
946 if !self.conout.is_invalid() && !self.async_ {
947 let _ = CloseHandle(Into::<HANDLE>::into(self.conout));
948 }
949
950 if self.close_process && !self.process.is_invalid() {
951 let _ = CloseHandle(Into::<HANDLE>::into(self.process));
952 }
953
954 if let Some(thread_handle) = self.alive_thread.take() {
955 thread_handle.join().unwrap_or(());
956 }
957
958 if !self.reader_exit_event.is_invalid() {
959 let _ = CloseHandle(exit_handle);
960 }
961 }
962 }
963}