zng_view_api/
app_process.rs

1use std::{
2    collections::HashMap,
3    panic,
4    path::{Path, PathBuf},
5    sync::Arc,
6    thread::{self, JoinHandle},
7    time::Instant,
8};
9
10use std::time::Duration;
11
12use parking_lot::Mutex;
13use zng_task::channel::ChannelError;
14use zng_txt::Txt;
15
16use crate::{
17    AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
18    ipc::{self, EventReceiver},
19};
20
21/// The listener returns the closure on join for reuse in respawn.
22type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
23
24pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
25pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
26pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
27
28#[derive(Clone, Copy)]
29enum ViewState {
30    NotRunning,
31    RunningAndConnected,
32    Suspended,
33}
34
35/// View Process controller, used in the App Process.
36///
37/// # Exit
38///
39/// The View Process is [killed] when the controller is dropped, if the app is running in same process mode
40/// then the current process [exits] with code 0 on drop.
41///
42/// In multi-process mode the View Process is also killed to respawn if it does not send any event after 30 seconds,
43/// the app must call [`Controller::ping`] periodically to generate the [`Event::Pong`] to detect availability.
44///
45/// [killed]: std::process::Child::kill
46/// [exits]: std::process::exit
47#[cfg_attr(not(ipc), allow(unused))]
48pub struct Controller {
49    process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
50    view_state: ViewState,
51    generation: ViewProcessGen,
52    is_respawn: bool,
53    view_process_exe: PathBuf,
54    view_process_env: HashMap<Txt, Txt>,
55    request_sender: ipc::RequestSender,
56    response_receiver: ipc::ResponseReceiver,
57    event_listener: Option<EventListenerJoin>,
58    headless: bool,
59    same_process: bool,
60    last_respawn: Option<Instant>,
61    fast_respawn_count: u8,
62}
63#[cfg(test)]
64fn _assert_sync(x: Controller) -> impl Send + Sync {
65    x
66}
67impl Controller {
68    /// Start with a custom view process.
69    ///
70    /// The `view_process_exe` must be an executable that starts a view server.
71    /// Note that the [`VERSION`] of this crate must match in both executables.
72    ///
73    /// The `view_process_env` can be set to any env var needed to start the view-process. Note that if `view_process_exe`
74    /// is the current executable this most likely need set `zng_env::PROCESS_MAIN`.
75    ///
76    /// The `on_event` closure is called in another thread every time the app receives an event.
77    ///
78    /// # Tests
79    ///
80    /// The [`current_exe`] cannot be used in tests, you should set an external view-process executable. Unfortunately there
81    /// is no way to check if `start` was called in a test so we cannot provide an error message for this.
82    /// If the test is hanging in debug builds or has a timeout error in release builds this is probably the reason.
83    ///
84    /// # Connect Timeout
85    ///
86    /// If the view process takes longer than 10 seconds to connect it is considered failed and a respawn will be attempted.
87    /// This timeout is very reasonable in most cases, specially since users definitely need some visual feedback sooner, but
88    /// some test runner machines can be very slow. You can can set the `"ZNG_VIEW_TIMEOUT"` variable to a custom timeout in
89    /// seconds. The minimum value is 5 seconds. This timeout value is also used to define a *not responding* respawn.
90    ///
91    /// [`current_exe`]: std::env::current_exe
92    /// [`VERSION`]: crate::VERSION
93    pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
94    where
95        F: FnMut(Event) + Send + 'static,
96    {
97        Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
98    }
99    fn start_impl(
100        view_process_exe: PathBuf,
101        view_process_env: HashMap<Txt, Txt>,
102        headless: bool,
103        on_event: Box<dyn FnMut(Event) + Send>,
104    ) -> Self {
105        if ViewConfig::from_env().is_some() {
106            panic!("cannot start Controller in process configured to be view-process");
107        }
108
109        let (process, request_sender, response_receiver, event_receiver) =
110            Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
111        let same_process = process.is_none();
112        let process = Arc::new(Mutex::new(process.map(|p| (p, false))));
113        let ev = if same_process {
114            Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
115        } else {
116            Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
117        };
118
119        let mut c = Controller {
120            same_process,
121            view_state: ViewState::NotRunning,
122            process,
123            view_process_exe,
124            view_process_env,
125            request_sender,
126            response_receiver,
127            event_listener: Some(ev),
128            headless,
129            generation: ViewProcessGen::first(),
130            is_respawn: false,
131            last_respawn: None,
132            fast_respawn_count: 0,
133        };
134
135        if let Err(ChannelError::Disconnected { .. }) = c.try_init() {
136            panic!("respawn on init");
137        }
138
139        c
140    }
141    fn spawn_same_process_listener(
142        mut on_event: Box<dyn FnMut(Event) + Send>,
143        mut event_receiver: EventReceiver,
144        generation: ViewProcessGen,
145    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
146        thread::Builder::new()
147            .name("same_process_listener".into())
148            .spawn(move || {
149                while let Ok(ev) = event_receiver.recv() {
150                    on_event(ev);
151                }
152                on_event(Event::Disconnected(generation));
153
154                // return to reuse in respawn.
155                on_event
156            })
157            .expect("failed to spawn thread")
158    }
159    fn spawn_other_process_listener(
160        mut on_event: Box<dyn FnMut(Event) + Send>,
161        mut event_receiver: EventReceiver,
162        process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
163        generation: ViewProcessGen,
164    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
165        // spawns a thread that receives view-process events and monitors for process responsiveness
166        // - ipc-channel sometimes does not signal disconnect when the view-process dies, this monitors the process state every second.
167        // - app-process pings every 2s of inactivity, this kills the view-process it it does not respond for more them ZNG_VIEW_TIMEOUT.
168        thread::Builder::new()
169            .name("other_process_listener".into())
170            .spawn(move || {
171                const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
172                let timeout = view_timeout();
173                let mut check_count = 0u64;
174                loop {
175                    match event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
176                        Ok(ev) => {
177                            check_count = 0;
178                            on_event(ev)
179                        }
180                        Err(ChannelError::Timeout) => {
181                            if let Some(p) = &mut *process.lock() {
182                                match p.0.try_wait() {
183                                    Ok(c) => {
184                                        if c.is_some() {
185                                            // view-process died
186                                            break;
187                                        } else {
188                                            check_count += 1;
189                                            if check_count == timeout {
190                                                tracing::error!("view-process not responding for {timeout}s, will respawn");
191                                                let _ = p.0.kill();
192                                                p.1 = true;
193                                                break;
194                                            }
195                                        }
196                                    }
197                                    Err(e) => {
198                                        if e.kind() != std::io::ErrorKind::Interrupted {
199                                            tracing::error!("view-process try_wait error after inactivity, {e}");
200                                            break;
201                                        }
202                                    }
203                                }
204                            } else {
205                                // respawning already
206                                break;
207                            }
208                        }
209                        Err(_) => break,
210                    }
211                }
212                on_event(Event::Disconnected(generation));
213
214                // return to reuse in respawn.
215                on_event
216            })
217            .expect("failed to spawn thread")
218    }
219
220    fn try_init(&mut self) -> VpResult<()> {
221        self.init(self.generation, self.is_respawn, self.headless)?;
222        Ok(())
223    }
224
225    /// View-process is running, connected and ready to respond.
226    pub fn is_connected(&self) -> bool {
227        matches!(self.view_state, ViewState::RunningAndConnected)
228    }
229
230    /// View-process generation.
231    pub fn generation(&self) -> ViewProcessGen {
232        self.generation
233    }
234
235    /// If is running in headless mode.
236    pub fn headless(&self) -> bool {
237        self.headless
238    }
239
240    /// If is running both view and app in the same process.
241    pub fn same_process(&self) -> bool {
242        self.same_process
243    }
244
245    fn disconnected_err(&self) -> Result<(), ChannelError> {
246        if self.is_connected() {
247            Ok(())
248        } else {
249            Err(ChannelError::disconnected())
250        }
251    }
252
253    fn try_talk(&mut self, req: Request) -> Result<Response, ChannelError> {
254        self.request_sender.send(req)?;
255        self.response_receiver.recv()
256    }
257    pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
258        debug_assert!(req.expect_response());
259
260        if req.must_be_connected() {
261            self.disconnected_err()?;
262        }
263
264        match self.try_talk(req) {
265            Ok(r) => Ok(r),
266            Err(ChannelError::Disconnected { cause }) => {
267                self.handle_disconnect(self.generation);
268                Err(ChannelError::Disconnected { cause })
269            }
270            e => e,
271        }
272    }
273
274    pub(crate) fn command(&mut self, req: Request) -> Result<(), ChannelError> {
275        debug_assert!(!req.expect_response());
276
277        if req.must_be_connected() {
278            self.disconnected_err()?;
279        }
280
281        match self.request_sender.send(req) {
282            Ok(_) => Ok(()),
283            Err(ChannelError::Disconnected { cause }) => {
284                self.handle_disconnect(self.generation);
285                Err(ChannelError::Disconnected { cause })
286            }
287            e => e,
288        }
289    }
290
291    fn spawn_view_process(
292        view_process_exe: &Path,
293        view_process_env: &HashMap<Txt, Txt>,
294        headless: bool,
295    ) -> AnyResult<(
296        Option<std::process::Child>,
297        ipc::RequestSender,
298        ipc::ResponseReceiver,
299        ipc::EventReceiver,
300    )> {
301        let _span = tracing::trace_span!("spawn_view_process").entered();
302
303        let init = ipc::AppInit::new();
304
305        // create process and spawn it, unless is running in same process mode.
306        let process = if ViewConfig::is_awaiting_same_process() {
307            ViewConfig::set_same_process(ViewConfig {
308                version: crate::VERSION.into(),
309                server_name: Txt::from_str(init.name()),
310                headless,
311            });
312            None
313        } else {
314            #[cfg(not(ipc))]
315            {
316                let _ = (view_process_exe, view_process_env);
317                panic!("expected only same_process mode with `ipc` feature disabled");
318            }
319
320            #[cfg(ipc)]
321            {
322                let mut process = std::process::Command::new(view_process_exe);
323                for (name, val) in view_process_env {
324                    process.env(name, val);
325                }
326                let process = process
327                    .env(VIEW_VERSION, crate::VERSION)
328                    .env(VIEW_SERVER, init.name())
329                    .env(VIEW_MODE, if headless { "headless" } else { "headed" })
330                    .env("RUST_BACKTRACE", "full")
331                    .spawn()?;
332                Some(process)
333            }
334        };
335
336        let (req, rsp, ev) = match init.connect() {
337            Ok(r) => r,
338            Err(e) => {
339                #[cfg(ipc)]
340                if let Some(mut p) = process {
341                    if let Err(ke) = p.kill() {
342                        tracing::error!(
343                            "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
344                        );
345                    } else {
346                        match p.wait() {
347                            Ok(output) => {
348                                let code = output.code();
349                                if ViewConfig::is_version_err(code, None) {
350                                    let code = code.unwrap_or(1);
351                                    tracing::error!(
352                                        "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
353                                                will exit app-process with code 0x{code:x}"
354                                    );
355                                    zng_env::exit(code);
356                                } else {
357                                    tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
358                                }
359                            }
360                            Err(e) => {
361                                tracing::error!("failed to read output status of killed view-process, {e}");
362                            }
363                        }
364                    }
365                } else {
366                    tracing::error!("failed to connect with same process");
367                }
368                return Err(e);
369            }
370        };
371
372        Ok((process, req, rsp, ev))
373    }
374
375    /// Handle an [`Event::Inited`].
376    ///
377    /// Set the connected flag to `true`.
378    pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
379        match self.view_state {
380            ViewState::NotRunning => {
381                if self.generation == vp_gen {
382                    // crash respawn already sets gen
383                    self.view_state = ViewState::RunningAndConnected;
384                }
385            }
386            ViewState::Suspended => {
387                self.generation = vp_gen;
388                self.view_state = ViewState::RunningAndConnected;
389            }
390            ViewState::RunningAndConnected => {}
391        }
392    }
393
394    /// Handle an [`Event::Suspended`].
395    ///
396    /// Set the connected flat to `false`.
397    pub fn handle_suspended(&mut self) {
398        self.view_state = ViewState::Suspended;
399    }
400
401    /// Handle an [`Event::Disconnected`].
402    ///
403    /// The `gen` parameter is the generation provided by the event. It is used to determinate if the disconnect has
404    /// not been handled already.
405    ///
406    /// Tries to cleanup the old view-process and start a new one, if all is successful an [`Event::Inited`] is send.
407    ///
408    /// The old view-process exit code and std output is logged using the `vp_respawn` target.
409    ///
410    /// Exits the current process with code `1` if the view-process was killed by the user. In Windows this is if
411    /// the view-process exit code is `1`. In Unix if it was killed by SIGKILL, SIGSTOP, SIGINT.
412    ///
413    /// # Panics
414    ///
415    /// If the last five respawns happened all within 500ms of the previous respawn.
416    ///
417    /// If the an error happens three times when trying to spawn the new view-process.
418    ///
419    /// If another disconnect happens during the view-process startup dialog.
420    pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
421        if vp_gen == self.generation {
422            #[cfg(not(ipc))]
423            {
424                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
425            }
426
427            #[cfg(ipc)]
428            {
429                self.respawn_impl(true)
430            }
431        } else {
432            tracing::warn!("disconnected event from previous generation ignored")
433        }
434    }
435
436    /// Reopen the view-process, causing another [`Event::Inited`].
437    ///
438    /// This is similar to [`handle_disconnect`] but the current process does not
439    /// exit depending on the view-process exit code.
440    ///
441    /// [`handle_disconnect`]: Controller::handle_disconnect
442    pub fn respawn(&mut self) {
443        #[cfg(not(ipc))]
444        {
445            tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
446        }
447
448        #[cfg(ipc)]
449        self.respawn_impl(false);
450    }
451    #[cfg(ipc)]
452    fn respawn_impl(&mut self, is_crash: bool) {
453        use zng_unit::TimeUnits;
454
455        self.view_state = ViewState::NotRunning;
456        self.is_respawn = true;
457
458        let (mut process, mut killed_by_us) = if let Some(p) = self.process.lock().take() {
459            p
460        } else {
461            if self.same_process {
462                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
463            }
464            return;
465        };
466        if is_crash {
467            tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
468        }
469
470        if is_crash {
471            let t = Instant::now();
472            if let Some(last_respawn) = self.last_respawn {
473                if t - last_respawn < Duration::from_secs(60) {
474                    self.fast_respawn_count += 1;
475                    if self.fast_respawn_count == 2 {
476                        panic!("disconnect respawn happened 2 times in less than 1 minute");
477                    }
478                } else {
479                    self.fast_respawn_count = 0;
480                }
481            }
482            self.last_respawn = Some(t);
483        } else {
484            self.last_respawn = None;
485        }
486
487        // try exit
488        if !is_crash {
489            let _ = process.kill();
490            killed_by_us = true;
491        } else if !matches!(process.try_wait(), Ok(Some(_))) {
492            // if not exited, give the process 300ms to close with the preferred exit code.
493            thread::sleep(300.ms());
494
495            if !matches!(process.try_wait(), Ok(Some(_))) {
496                // if still not exited, kill it.
497                killed_by_us = true;
498                let _ = process.kill();
499            }
500        }
501
502        let exit_status = match process.wait() {
503            Ok(c) => Some(c),
504            Err(e) => {
505                tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
506                None
507            }
508        };
509
510        // try print stdout/err and exit code.
511        if let Some(c) = exit_status {
512            tracing::info!(target: "vp_respawn", "view-process killed");
513
514            let code = c.code();
515            #[allow(unused_mut)]
516            let mut signal = None::<i32>;
517
518            if !killed_by_us {
519                // check if user killed the view-process, in this case we exit too.
520
521                #[cfg(windows)]
522                if code == Some(1) {
523                    tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
524                                        will exit app-process with the same code");
525                    zng_env::exit(1);
526                }
527
528                #[cfg(unix)]
529                if code.is_none() {
530                    use std::os::unix::process::ExitStatusExt as _;
531                    signal = c.signal();
532
533                    if let Some(sig) = signal
534                        && [2, 9, 17, 19, 23].contains(&sig)
535                    {
536                        tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
537                                            will exit app-process with code 1");
538                        zng_env::exit(1);
539                    }
540                }
541            }
542
543            if !killed_by_us {
544                let code = code.unwrap_or(0);
545                let signal = signal.unwrap_or(0);
546                tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
547            }
548
549            if ViewConfig::is_version_err(code, None) {
550                let code = code.unwrap_or(1);
551                tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
552                                        will exit app-process with code 0x{code:x}");
553                zng_env::exit(code);
554            }
555        } else {
556            tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
557        }
558
559        // recover event listener closure (in a box).
560        let on_event = match self.event_listener.take().unwrap().join() {
561            Ok(fn_) => fn_,
562            Err(p) => panic::resume_unwind(p),
563        };
564
565        // respawn
566        let mut retries = 3;
567        let (new_process, request, response, event_listener) = loop {
568            match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
569                Ok(r) => break r,
570                Err(e) => {
571                    tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
572                    retries -= 1;
573                    if retries == 0 {
574                        panic!("failed to respawn `view-process` after 3 retries");
575                    }
576                    tracing::info!(target: "vp_respawn", "retrying respawn");
577                }
578            }
579        };
580
581        // update connections
582        self.process = Arc::new(Mutex::new(Some((new_process.unwrap(), false))));
583        self.request_sender = request;
584        self.response_receiver = response;
585
586        let next_id = self.generation.next();
587        self.generation = next_id;
588
589        let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
590        self.event_listener = Some(ev);
591
592        if let Err(ChannelError::Disconnected { .. }) = self.try_init() {
593            panic!("respawn on respawn startup");
594        }
595    }
596}
597impl Drop for Controller {
598    /// Kills the View Process, unless it is running in the same process.
599    fn drop(&mut self) {
600        let _ = self.exit();
601        #[cfg(ipc)]
602        if let Some((mut process, _)) = self.process.lock().take()
603            && process.try_wait().is_err()
604        {
605            std::thread::sleep(Duration::from_secs(1));
606            if process.try_wait().is_err() {
607                tracing::error!("view-process did not exit after 1s, killing");
608                let _ = process.kill();
609                let _ = process.wait();
610            }
611        }
612    }
613}
614
615const VIEW_TIMEOUT: &str = "ZNG_VIEW_TIMEOUT";
616/// Timeout in seconds.
617pub(crate) fn view_timeout() -> u64 {
618    match std::env::var(VIEW_TIMEOUT) {
619        Ok(s) if !s.is_empty() => match s.parse::<u64>() {
620            Ok(s) => s.max(5),
621            Err(e) => {
622                tracing::error!("invalid {VIEW_TIMEOUT:?} value, {e}");
623                10
624            }
625        },
626        _ => 10,
627    }
628}