Skip to main content

zng_view_api/
app_process.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4    collections::HashMap,
5    panic,
6    path::{Path, PathBuf},
7    sync::Arc,
8    thread::{self, JoinHandle},
9    time::Instant,
10};
11
12use std::time::Duration;
13
14use zng_task::channel::ChannelError;
15use zng_task::parking_lot::Mutex;
16use zng_txt::Txt;
17
18use crate::{
19    AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
20    ipc::{self, EventReceiver},
21};
22
23/// The listener returns the closure on join for reuse in respawn.
24type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
25
26pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
27pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
28pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
29
30#[derive(Clone, Copy)]
31enum ViewState {
32    NotRunning,
33    RunningAndConnected,
34    Suspended,
35}
36
37#[cfg(ipc)]
38use zng_task::process::tap::StderrTap;
39#[cfg(not(ipc))]
40struct StderrTap;
41
42/// View Process controller, used in the App Process.
43///
44/// # Exit
45///
46/// The View Process is [killed] when the controller is dropped, if the app is running in same process mode
47/// then the current process [exits] with code 0 on drop.
48///
49/// In multi-process mode the View Process is also killed to respawn if it does not send any event after 30 seconds,
50/// the app must call [`Controller::ping`] periodically to generate the [`Event::Pong`] to detect availability.
51///
52/// [killed]: std::process::Child::kill
53/// [exits]: std::process::exit
54pub struct Controller {
55    process: Arc<Mutex<Option<(std::process::Child, StderrTap, bool)>>>,
56    view_state: ViewState,
57    generation: ViewProcessGen,
58    is_respawn: bool,
59    view_process_exe: PathBuf,
60    view_process_env: HashMap<Txt, Txt>,
61    request_sender: ipc::RequestSender,
62    response_receiver: ipc::ResponseReceiver,
63    event_listener: Option<EventListenerJoin>,
64    headless: bool,
65    same_process: bool,
66    last_respawn: Option<Instant>,
67    fast_respawn_count: u8,
68}
69#[cfg(test)]
70fn _assert_sync(x: Controller) -> impl Send + Sync {
71    x
72}
73impl Controller {
74    /// Start with a custom view process.
75    ///
76    /// The `view_process_exe` must be an executable that starts a view server.
77    /// Note that the [`VERSION`] of this crate must match in both executables.
78    ///
79    /// The `view_process_env` can be set to any env var needed to start the view-process. Note that if `view_process_exe`
80    /// is the current executable this most likely need set `zng_env::PROCESS_MAIN`.
81    ///
82    /// The `on_event` closure is called in another thread every time the app receives an event.
83    ///
84    /// # Tests
85    ///
86    /// The [`current_exe`] cannot be used in tests, you should set an external view-process executable. Unfortunately there
87    /// is no way to check if `start` was called in a test so we cannot provide an error message for this.
88    /// If the test is hanging in debug builds or has a timeout error in release builds this is probably the reason.
89    ///
90    /// # Connect Timeout
91    ///
92    /// If the view process takes longer than 10 seconds to connect it is considered failed and a respawn will be attempted.
93    /// This timeout is very reasonable in most cases, specially since users definitely need some visual feedback sooner, but
94    /// some test runner machines can be very slow. You can can set the `"ZNG_VIEW_TIMEOUT"` variable to a custom timeout in
95    /// seconds. The minimum value is 5 seconds. This timeout value is also used to define a *not responding* respawn.
96    ///
97    /// [`current_exe`]: std::env::current_exe
98    /// [`VERSION`]: crate::VERSION
99    pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
100    where
101        F: FnMut(Event) + Send + 'static,
102    {
103        Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
104    }
105    fn start_impl(
106        view_process_exe: PathBuf,
107        view_process_env: HashMap<Txt, Txt>,
108        headless: bool,
109        on_event: Box<dyn FnMut(Event) + Send>,
110    ) -> Self {
111        if ViewConfig::from_env().is_some() {
112            panic!("cannot start Controller in process configured to be view-process");
113        }
114
115        let (process, request_sender, response_receiver, event_receiver) =
116            Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
117        let same_process = process.is_none();
118        let process = Arc::new(Mutex::new(process.map(|(p, e)| (p, e, false))));
119        let ev = if same_process {
120            Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
121        } else {
122            #[cfg(ipc)]
123            {
124                Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
125            }
126            #[cfg(not(ipc))]
127            {
128                unreachable!()
129            }
130        };
131
132        let mut c = Controller {
133            same_process,
134            view_state: ViewState::NotRunning,
135            process,
136            view_process_exe,
137            view_process_env,
138            request_sender,
139            response_receiver,
140            event_listener: Some(ev),
141            headless,
142            generation: ViewProcessGen::first(),
143            is_respawn: false,
144            last_respawn: None,
145            fast_respawn_count: 0,
146        };
147
148        if let Err(ChannelError::Disconnected { .. }) = c.try_init() {
149            panic!("respawn on init");
150        }
151
152        c
153    }
154    fn spawn_same_process_listener(
155        mut on_event: Box<dyn FnMut(Event) + Send>,
156        mut event_receiver: EventReceiver,
157        generation: ViewProcessGen,
158    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
159        thread::Builder::new()
160            .name("same_process_listener".into())
161            .spawn(move || {
162                while let Ok(ev) = event_receiver.recv() {
163                    on_event(ev);
164                }
165                on_event(Event::Disconnected(generation));
166
167                // return to reuse in respawn.
168                on_event
169            })
170            .expect("failed to spawn thread")
171    }
172    #[cfg(ipc)]
173    fn spawn_other_process_listener(
174        mut on_event: Box<dyn FnMut(Event) + Send>,
175        mut event_receiver: EventReceiver,
176        process: Arc<Mutex<Option<(std::process::Child, StderrTap, bool)>>>,
177        generation: ViewProcessGen,
178    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
179        // spawns a thread that receives view-process events and monitors for process responsiveness
180        // - ipc-channel sometimes does not signal disconnect when the view-process dies, this monitors the process state every second.
181        // - app-process pings every 2s of inactivity, this kills the view-process it it does not respond for more them ZNG_VIEW_TIMEOUT.
182        thread::Builder::new()
183            .name("other_process_listener".into())
184            .spawn(move || {
185                const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
186                let timeout = view_timeout();
187                let mut check_count = 0u64;
188                loop {
189                    match event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
190                        Ok(ev) => {
191                            check_count = 0;
192                            on_event(ev)
193                        }
194                        Err(ChannelError::Timeout) => {
195                            if let Some(p) = &mut *process.lock() {
196                                match p.0.try_wait() {
197                                    Ok(c) => {
198                                        if c.is_some() {
199                                            // view-process died
200                                            break;
201                                        } else {
202                                            check_count += 1;
203                                            if check_count == timeout {
204                                                tracing::error!("view-process not responding for {timeout}s, will respawn");
205                                                let _ = p.0.kill();
206                                                p.2 = true;
207                                                break;
208                                            }
209                                        }
210                                    }
211                                    Err(e) => {
212                                        if e.kind() != std::io::ErrorKind::Interrupted {
213                                            tracing::error!("view-process try_wait error after inactivity, {e}");
214                                            break;
215                                        }
216                                    }
217                                }
218                            } else {
219                                // respawning already
220                                break;
221                            }
222                        }
223                        Err(_) => break,
224                    }
225                }
226                on_event(Event::Disconnected(generation));
227
228                // return to reuse in respawn.
229                on_event
230            })
231            .expect("failed to spawn thread")
232    }
233
234    fn try_init(&mut self) -> VpResult<()> {
235        self.init(self.generation, self.is_respawn, self.headless)?;
236        Ok(())
237    }
238
239    /// View-process is running, connected and ready to respond.
240    pub fn is_connected(&self) -> bool {
241        matches!(self.view_state, ViewState::RunningAndConnected)
242    }
243
244    /// View-process generation.
245    pub fn generation(&self) -> ViewProcessGen {
246        self.generation
247    }
248
249    /// If is running in headless mode.
250    pub fn headless(&self) -> bool {
251        self.headless
252    }
253
254    /// If is running both view and app in the same process.
255    pub fn same_process(&self) -> bool {
256        self.same_process
257    }
258
259    fn try_talk(&mut self, req: Request) -> Result<Response, ChannelError> {
260        self.request_sender.send(req)?;
261        self.response_receiver.recv()
262    }
263    pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
264        debug_assert!(req.expect_response());
265
266        tracing::trace!("talk {req:?}");
267
268        if req.must_be_connected() && !self.is_connected() {
269            tracing::error!("cannot send request {req:?}, not connected");
270            return Err(ChannelError::disconnected());
271        }
272
273        match self.try_talk(req) {
274            Ok(r) => {
275                tracing::trace!("talk {r:?}");
276                Ok(r)
277            }
278            Err(ChannelError::Disconnected { cause }) => {
279                self.handle_disconnect(self.generation);
280                Err(ChannelError::Disconnected { cause })
281            }
282            e => e,
283        }
284    }
285
286    pub(crate) fn command(&mut self, req: Request) -> Result<(), ChannelError> {
287        debug_assert!(!req.expect_response());
288
289        tracing::trace!("command {req:?}");
290
291        if req.must_be_connected() && !self.is_connected() {
292            tracing::error!("cannot send request {req:?}, not connected");
293            return Err(ChannelError::disconnected());
294        }
295
296        match self.request_sender.send(req) {
297            Ok(_) => {
298                tracing::trace!("command ok");
299                Ok(())
300            }
301            Err(ChannelError::Disconnected { cause }) => {
302                self.handle_disconnect(self.generation);
303                Err(ChannelError::Disconnected { cause })
304            }
305            e => e,
306        }
307    }
308
309    #[allow(clippy::type_complexity)]
310    fn spawn_view_process(
311        view_process_exe: &Path,
312        view_process_env: &HashMap<Txt, Txt>,
313        headless: bool,
314    ) -> AnyResult<(
315        Option<(std::process::Child, StderrTap)>,
316        ipc::RequestSender,
317        ipc::ResponseReceiver,
318        ipc::EventReceiver,
319    )> {
320        let _span = tracing::trace_span!("spawn_view_process").entered();
321
322        let init = ipc::AppInit::new();
323
324        // create process and spawn it, unless is running in same process mode.
325        let process = if ViewConfig::is_awaiting_same_process() {
326            ViewConfig::set_same_process(ViewConfig {
327                version: crate::VERSION.into(),
328                server_name: Txt::from_str(init.name()),
329                headless,
330            });
331            None
332        } else {
333            #[cfg(not(ipc))]
334            {
335                let _ = (view_process_exe, view_process_env);
336                panic!("expected only same_process mode with `ipc` feature disabled");
337            }
338
339            #[cfg(ipc)]
340            {
341                let mut process = std::process::Command::new(view_process_exe);
342                for (name, val) in view_process_env {
343                    process.env(name, val);
344                }
345                let mut process = process
346                    .env(VIEW_VERSION, crate::VERSION)
347                    .env(VIEW_SERVER, init.name())
348                    .env(VIEW_MODE, if headless { "headless" } else { "headed" })
349                    .env("RUST_BACKTRACE", "full")
350                    .stderr(std::process::Stdio::piped())
351                    .spawn()?;
352
353                let stderr = StderrTap::new_blocking(process.stderr.take().unwrap());
354
355                Some((process, stderr))
356            }
357        };
358
359        let (req, rsp, ev) = match init.connect() {
360            Ok(r) => r,
361            Err(e) => {
362                #[cfg(ipc)]
363                if let Some((mut p, _)) = process {
364                    if let Err(ke) = p.kill() {
365                        tracing::error!(
366                            "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
367                        );
368                    } else {
369                        match p.wait() {
370                            Ok(output) => {
371                                let code = output.code();
372                                if ViewConfig::is_version_err(code, None) {
373                                    let code = code.unwrap_or(1);
374                                    tracing::error!(
375                                        "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
376                                                will exit app-process with code 0x{code:x}"
377                                    );
378                                    zng_env::exit(code);
379                                } else {
380                                    tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
381                                }
382                            }
383                            Err(e) => {
384                                tracing::error!("failed to read output status of killed view-process, {e}");
385                            }
386                        }
387                    }
388                } else {
389                    tracing::error!("failed to connect with same process");
390                }
391                return Err(e);
392            }
393        };
394
395        Ok((process, req, rsp, ev))
396    }
397
398    /// Handle an [`Event::Inited`].
399    ///
400    /// Set the connected flag to `true`.
401    pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
402        match self.view_state {
403            ViewState::NotRunning => {
404                if self.generation == vp_gen {
405                    // crash respawn already sets gen
406                    self.view_state = ViewState::RunningAndConnected;
407                }
408            }
409            ViewState::Suspended => {
410                self.generation = vp_gen;
411                self.view_state = ViewState::RunningAndConnected;
412            }
413            ViewState::RunningAndConnected => {}
414        }
415    }
416
417    /// Handle an [`Event::Suspended`].
418    ///
419    /// Set the connected flat to `false`.
420    pub fn handle_suspended(&mut self) {
421        self.view_state = ViewState::Suspended;
422    }
423
424    /// Handle an [`Event::Disconnected`].
425    ///
426    /// The `gen` parameter is the generation provided by the event. It is used to determinate if the disconnect has
427    /// not been handled already.
428    ///
429    /// Tries to cleanup the old view-process and start a new one, if all is successful an [`Event::Inited`] is send.
430    ///
431    /// The old view-process exit code and std output is logged using the `vp_respawn` target.
432    ///
433    /// Exits the current process with code `1` if the view-process was killed by the user. In Windows this is if
434    /// the view-process exit code is `1`. In Unix if it was killed by SIGKILL, SIGSTOP, SIGINT.
435    ///
436    /// # Panics
437    ///
438    /// If the last five respawns happened all within 500ms of the previous respawn.
439    ///
440    /// If the an error happens three times when trying to spawn the new view-process.
441    ///
442    /// If another disconnect happens during the view-process startup dialog.
443    pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
444        if vp_gen == self.generation {
445            #[cfg(not(ipc))]
446            {
447                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
448            }
449
450            #[cfg(ipc)]
451            {
452                self.respawn_impl(true)
453            }
454        } else {
455            tracing::warn!("disconnected event from previous generation ignored")
456        }
457    }
458
459    /// Reopen the view-process, causing another [`Event::Inited`].
460    ///
461    /// This is similar to [`handle_disconnect`] but the current process does not
462    /// exit depending on the view-process exit code.
463    ///
464    /// [`handle_disconnect`]: Controller::handle_disconnect
465    pub fn respawn(&mut self) {
466        #[cfg(not(ipc))]
467        {
468            tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
469        }
470
471        #[cfg(ipc)]
472        self.respawn_impl(false);
473    }
474    #[cfg(ipc)]
475    fn respawn_impl(&mut self, is_crash: bool) {
476        use zng_unit::TimeUnits;
477
478        self.view_state = ViewState::NotRunning;
479        self.is_respawn = true;
480
481        let (mut process, stderr, mut killed_by_us) = if let Some(p) = self.process.lock().take() {
482            p
483        } else {
484            if self.same_process {
485                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
486            }
487            return;
488        };
489        if is_crash {
490            tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
491        }
492
493        if is_crash {
494            let t = Instant::now();
495            if let Some(last_respawn) = self.last_respawn {
496                if t - last_respawn < Duration::from_secs(60) {
497                    self.fast_respawn_count += 1;
498                    if self.fast_respawn_count == 2 {
499                        panic!("disconnect respawn happened 2 times in less than 1 minute");
500                    }
501                } else {
502                    self.fast_respawn_count = 0;
503                }
504            }
505            self.last_respawn = Some(t);
506        } else {
507            self.last_respawn = None;
508        }
509
510        // try exit
511        if !is_crash {
512            let _ = process.kill();
513            killed_by_us = true;
514        } else if !matches!(process.try_wait(), Ok(Some(_))) {
515            // if not exited, give the process 300ms to close with the preferred exit code.
516            thread::sleep(300.ms());
517
518            if !matches!(process.try_wait(), Ok(Some(_))) {
519                // if still not exited, kill it.
520                killed_by_us = true;
521                let _ = process.kill();
522            }
523        }
524
525        let exit_status = match process.wait() {
526            Ok(c) => Some(c),
527            Err(e) => {
528                tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
529                None
530            }
531        };
532
533        // try print stdout/err and exit code.
534        if let Some(c) = exit_status {
535            tracing::info!(target: "vp_respawn", "view-process killed");
536
537            let code = c.code();
538            #[allow(unused_mut)]
539            let mut signal = None::<i32>;
540
541            if !killed_by_us {
542                // check if user killed the view-process, in this case we exit too.
543
544                #[cfg(windows)]
545                if code == Some(1) {
546                    tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
547                                        will exit app-process with the same code");
548                    zng_env::exit(1);
549                }
550
551                #[cfg(unix)]
552                if code.is_none() {
553                    use std::os::unix::process::ExitStatusExt as _;
554                    signal = c.signal();
555
556                    if let Some(sig) = signal
557                        && [2, 9, 17, 19, 23].contains(&sig)
558                    {
559                        tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
560                                            will exit app-process with code 1");
561                        zng_env::exit(1);
562                    }
563                }
564            }
565
566            if !killed_by_us {
567                let code = code.unwrap_or(0);
568                let signal = signal.unwrap_or(0);
569                if code == 101
570                    && let Ok(panic) = stderr.into_panic_blocking()
571                {
572                    tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}, panic: {}", panic.display_no_backtrace());
573                } else {
574                    tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
575                }
576            }
577
578            if ViewConfig::is_version_err(code, None) {
579                let code = code.unwrap_or(1);
580                tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
581                                        will exit app-process with code 0x{code:x}");
582                zng_env::exit(code);
583            }
584        } else {
585            tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
586        }
587
588        // recover event listener closure (in a box).
589        let on_event = match self.event_listener.take().unwrap().join() {
590            Ok(fn_) => fn_,
591            Err(p) => panic::resume_unwind(p),
592        };
593
594        // respawn
595        let mut retries = 3;
596        let (new_process, request, response, event_listener) = loop {
597            match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
598                Ok(r) => break r,
599                Err(e) => {
600                    tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
601                    retries -= 1;
602                    if retries == 0 {
603                        panic!("failed to respawn `view-process` after 3 retries");
604                    }
605                    tracing::info!(target: "vp_respawn", "retrying respawn");
606                }
607            }
608        };
609
610        // update connections
611        let (new_process, new_stderr) = new_process.unwrap();
612        self.process = Arc::new(Mutex::new(Some((new_process, new_stderr, false))));
613        self.request_sender = request;
614        self.response_receiver = response;
615
616        let next_id = self.generation.next();
617        self.generation = next_id;
618
619        let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
620        self.event_listener = Some(ev);
621
622        if let Err(ChannelError::Disconnected { .. }) = self.try_init() {
623            panic!("respawn on respawn startup");
624        }
625    }
626}
627impl Drop for Controller {
628    /// Kills the View Process, unless it is running in the same process.
629    fn drop(&mut self) {
630        let _ = self.exit();
631        #[cfg(ipc)]
632        if let Some((mut process, _, _)) = self.process.lock().take()
633            && process.try_wait().is_err()
634        {
635            std::thread::sleep(Duration::from_secs(1));
636            if process.try_wait().is_err() {
637                tracing::error!("view-process did not exit after 1s, killing");
638                let _ = process.kill();
639                let _ = process.wait();
640            }
641        }
642    }
643}
644
645const VIEW_TIMEOUT: &str = "ZNG_VIEW_TIMEOUT";
646const VIEW_TIMEOUT_DEFAULT: u64 = 20;
647/// Timeout in seconds.
648pub(crate) fn view_timeout() -> u64 {
649    match std::env::var(VIEW_TIMEOUT) {
650        Ok(s) if !s.is_empty() => match s.parse::<u64>() {
651            Ok(s) => s.max(5),
652            Err(e) => {
653                if s == "false" {
654                    return u64::MAX;
655                }
656                tracing::error!("invalid {VIEW_TIMEOUT:?} value, {e}");
657                VIEW_TIMEOUT_DEFAULT
658            }
659        },
660        _ => VIEW_TIMEOUT_DEFAULT,
661    }
662}