zng_view_api/
app_process.rs

1use std::{
2    collections::HashMap,
3    panic,
4    path::{Path, PathBuf},
5    sync::Arc,
6    thread::{self, JoinHandle},
7    time::Instant,
8};
9
10use std::time::Duration;
11
12use parking_lot::Mutex;
13use zng_txt::Txt;
14
15use crate::{
16    AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
17    ipc::{self, EventReceiver},
18};
19
20/// The listener returns the closure on join for reuse in respawn.
21type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
22
23pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
24pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
25pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
26
27#[derive(Clone, Copy)]
28enum ViewState {
29    NotRunning,
30    RunningAndConnected,
31    Suspended,
32}
33
34/// View Process controller, used in the App Process.
35///
36/// # Exit
37///
38/// The View Process is [killed] when the controller is dropped, if the app is running in same process mode
39/// then the current process [exits] with code 0 on drop.
40///
41/// In multi-process mode the View Process is also killed to respawn if it does not send any event after 30 seconds,
42/// the app must call [`Controller::ping`] periodically to generate the [`Event::Pong`] to detect availability.
43///
44/// [killed]: std::process::Child::kill
45/// [exits]: std::process::exit
46#[cfg_attr(not(ipc), allow(unused))]
47pub struct Controller {
48    process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
49    view_state: ViewState,
50    generation: ViewProcessGen,
51    is_respawn: bool,
52    view_process_exe: PathBuf,
53    view_process_env: HashMap<Txt, Txt>,
54    request_sender: ipc::RequestSender,
55    response_receiver: ipc::ResponseReceiver,
56    event_listener: Option<EventListenerJoin>,
57    headless: bool,
58    same_process: bool,
59    last_respawn: Option<Instant>,
60    fast_respawn_count: u8,
61}
62#[cfg(test)]
63fn _assert_sync(x: Controller) -> impl Send + Sync {
64    x
65}
66impl Controller {
67    /// Start with a custom view process.
68    ///
69    /// The `view_process_exe` must be an executable that starts a view server.
70    /// Note that the [`VERSION`] of this crate must match in both executables.
71    ///
72    /// The `view_process_env` can be set to any env var needed to start the view-process. Note that if `view_process_exe`
73    /// is the current executable this most likely need set `zng_env::PROCESS_MAIN`.
74    ///
75    /// The `on_event` closure is called in another thread every time the app receives an event.
76    ///
77    /// # Tests
78    ///
79    /// The [`current_exe`] cannot be used in tests, you should set an external view-process executable. Unfortunately there
80    /// is no way to check if `start` was called in a test so we cannot provide an error message for this.
81    /// If the test is hanging in debug builds or has a timeout error in release builds this is probably the reason.
82    ///
83    /// # Connect Timeout
84    ///
85    /// If the view process takes longer than 10 seconds to connect it is considered failed and a respawn will be attempted.
86    /// This timeout is very reasonable in most cases, specially since users definitely need some visual feedback sooner, but
87    /// some test runner machines can be very slow. You can can set the `"ZNG_VIEW_TIMEOUT"` variable to a custom timeout in
88    /// seconds. The minimum value is 5 seconds. This timeout value is also used to define a *not responding* respawn.
89    ///
90    /// [`current_exe`]: std::env::current_exe
91    /// [`VERSION`]: crate::VERSION
92    pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
93    where
94        F: FnMut(Event) + Send + 'static,
95    {
96        Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
97    }
98    fn start_impl(
99        view_process_exe: PathBuf,
100        view_process_env: HashMap<Txt, Txt>,
101        headless: bool,
102        on_event: Box<dyn FnMut(Event) + Send>,
103    ) -> Self {
104        if ViewConfig::from_env().is_some() {
105            panic!("cannot start Controller in process configured to be view-process");
106        }
107
108        let (process, request_sender, response_receiver, event_receiver) =
109            Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
110        let same_process = process.is_none();
111        let process = Arc::new(Mutex::new(process.map(|p| (p, false))));
112        let ev = if same_process {
113            Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
114        } else {
115            Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
116        };
117
118        let mut c = Controller {
119            same_process,
120            view_state: ViewState::NotRunning,
121            process,
122            view_process_exe,
123            view_process_env,
124            request_sender,
125            response_receiver,
126            event_listener: Some(ev),
127            headless,
128            generation: ViewProcessGen::first(),
129            is_respawn: false,
130            last_respawn: None,
131            fast_respawn_count: 0,
132        };
133
134        if let Err(ipc::ViewChannelError::Disconnected) = c.try_init() {
135            panic!("respawn on init");
136        }
137
138        c
139    }
140    fn spawn_same_process_listener(
141        mut on_event: Box<dyn FnMut(Event) + Send>,
142        mut event_receiver: EventReceiver,
143        generation: ViewProcessGen,
144    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
145        thread::Builder::new()
146            .name("same_process_listener".into())
147            .spawn(move || {
148                while let Ok(ev) = event_receiver.recv() {
149                    on_event(ev);
150                }
151                on_event(Event::Disconnected(generation));
152
153                // return to reuse in respawn.
154                on_event
155            })
156            .expect("failed to spawn thread")
157    }
158    fn spawn_other_process_listener(
159        mut on_event: Box<dyn FnMut(Event) + Send>,
160        mut event_receiver: EventReceiver,
161        process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
162        generation: ViewProcessGen,
163    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
164        // spawns a thread that receives view-process events and monitors for process responsiveness
165        // - ipc-channel sometimes does not signal disconnect when the view-process dies, this monitors the process state every second.
166        // - app-process pings every 2s of inactivity, this kills the view-process it it does not respond for more them ZNG_VIEW_TIMEOUT.
167        thread::Builder::new()
168            .name("other_process_listener".into())
169            .spawn(move || {
170                const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
171                let timeout = view_timeout();
172                let mut check_count = 0u64;
173                while let Ok(maybe) = event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
174                    match maybe {
175                        Some(ev) => {
176                            check_count = 0;
177                            on_event(ev)
178                        }
179                        None => {
180                            if let Some(p) = &mut *process.lock() {
181                                match p.0.try_wait() {
182                                    Ok(c) => {
183                                        if c.is_some() {
184                                            // view-process died
185                                            break;
186                                        } else {
187                                            check_count += 1;
188                                            if check_count == timeout {
189                                                tracing::error!("view-process not responding for {timeout}s, will respawn");
190                                                let _ = p.0.kill();
191                                                p.1 = true;
192                                                break;
193                                            }
194                                        }
195                                    }
196                                    Err(e) => {
197                                        if e.kind() != std::io::ErrorKind::Interrupted {
198                                            tracing::error!("view-process try_wait error after inactivity, {e}");
199                                            break;
200                                        }
201                                    }
202                                }
203                            } else {
204                                // respawning already
205                                break;
206                            }
207                        }
208                    }
209                }
210                on_event(Event::Disconnected(generation));
211
212                // return to reuse in respawn.
213                on_event
214            })
215            .expect("failed to spawn thread")
216    }
217
218    fn try_init(&mut self) -> VpResult<()> {
219        self.init(self.generation, self.is_respawn, self.headless)?;
220        Ok(())
221    }
222
223    /// View-process is running, connected and ready to respond.
224    pub fn is_connected(&self) -> bool {
225        matches!(self.view_state, ViewState::RunningAndConnected)
226    }
227
228    /// View-process generation.
229    pub fn generation(&self) -> ViewProcessGen {
230        self.generation
231    }
232
233    /// If is running in headless mode.
234    pub fn headless(&self) -> bool {
235        self.headless
236    }
237
238    /// If is running both view and app in the same process.
239    pub fn same_process(&self) -> bool {
240        self.same_process
241    }
242
243    fn disconnected_err(&self) -> Result<(), ipc::ViewChannelError> {
244        if self.is_connected() {
245            Ok(())
246        } else {
247            Err(ipc::ViewChannelError::Disconnected)
248        }
249    }
250
251    fn try_talk(&mut self, req: Request) -> ipc::IpcResult<Response> {
252        self.request_sender.send(req)?;
253        self.response_receiver.recv()
254    }
255    pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
256        debug_assert!(req.expect_response());
257
258        if req.must_be_connected() {
259            self.disconnected_err()?;
260        }
261
262        match self.try_talk(req) {
263            Ok(r) => Ok(r),
264            Err(ipc::ViewChannelError::Disconnected) => {
265                self.handle_disconnect(self.generation);
266                Err(ipc::ViewChannelError::Disconnected)
267            }
268        }
269    }
270
271    pub(crate) fn command(&mut self, req: Request) -> VpResult<()> {
272        debug_assert!(!req.expect_response());
273
274        if req.must_be_connected() {
275            self.disconnected_err()?;
276        }
277
278        match self.request_sender.send(req) {
279            Ok(_) => Ok(()),
280            Err(ipc::ViewChannelError::Disconnected) => {
281                self.handle_disconnect(self.generation);
282                Err(ipc::ViewChannelError::Disconnected)
283            }
284        }
285    }
286
287    fn spawn_view_process(
288        view_process_exe: &Path,
289        view_process_env: &HashMap<Txt, Txt>,
290        headless: bool,
291    ) -> AnyResult<(
292        Option<std::process::Child>,
293        ipc::RequestSender,
294        ipc::ResponseReceiver,
295        ipc::EventReceiver,
296    )> {
297        let _span = tracing::trace_span!("spawn_view_process").entered();
298
299        let init = ipc::AppInit::new();
300
301        // create process and spawn it, unless is running in same process mode.
302        let process = if ViewConfig::is_awaiting_same_process() {
303            ViewConfig::set_same_process(ViewConfig {
304                version: crate::VERSION.into(),
305                server_name: Txt::from_str(init.name()),
306                headless,
307            });
308            None
309        } else {
310            #[cfg(not(ipc))]
311            {
312                let _ = (view_process_exe, view_process_env);
313                panic!("expected only same_process mode with `ipc` feature disabled");
314            }
315
316            #[cfg(ipc)]
317            {
318                let mut process = std::process::Command::new(view_process_exe);
319                for (name, val) in view_process_env {
320                    process.env(name, val);
321                }
322                let process = process
323                    .env(VIEW_VERSION, crate::VERSION)
324                    .env(VIEW_SERVER, init.name())
325                    .env(VIEW_MODE, if headless { "headless" } else { "headed" })
326                    .env("RUST_BACKTRACE", "full")
327                    .spawn()?;
328                Some(process)
329            }
330        };
331
332        let (req, rsp, ev) = match init.connect() {
333            Ok(r) => r,
334            Err(e) => {
335                #[cfg(ipc)]
336                if let Some(mut p) = process {
337                    if let Err(ke) = p.kill() {
338                        tracing::error!(
339                            "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
340                        );
341                    } else {
342                        match p.wait() {
343                            Ok(output) => {
344                                let code = output.code();
345                                if ViewConfig::is_version_err(code, None) {
346                                    let code = code.unwrap_or(1);
347                                    tracing::error!(
348                                        "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
349                                                will exit app-process with code 0x{code:x}"
350                                    );
351                                    zng_env::exit(code);
352                                } else {
353                                    tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
354                                }
355                            }
356                            Err(e) => {
357                                tracing::error!("failed to read output status of killed view-process, {e}");
358                            }
359                        }
360                    }
361                } else {
362                    tracing::error!("failed to connect with same process");
363                }
364                return Err(e);
365            }
366        };
367
368        Ok((process, req, rsp, ev))
369    }
370
371    /// Handle an [`Event::Inited`].
372    ///
373    /// Set the connected flag to `true`.
374    pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
375        match self.view_state {
376            ViewState::NotRunning => {
377                if self.generation == vp_gen {
378                    // crash respawn already sets gen
379                    self.view_state = ViewState::RunningAndConnected;
380                }
381            }
382            ViewState::Suspended => {
383                self.generation = vp_gen;
384                self.view_state = ViewState::RunningAndConnected;
385            }
386            ViewState::RunningAndConnected => {}
387        }
388    }
389
390    /// Handle an [`Event::Suspended`].
391    ///
392    /// Set the connected flat to `false`.
393    pub fn handle_suspended(&mut self) {
394        self.view_state = ViewState::Suspended;
395    }
396
397    /// Handle an [`Event::Disconnected`].
398    ///
399    /// The `gen` parameter is the generation provided by the event. It is used to determinate if the disconnect has
400    /// not been handled already.
401    ///
402    /// Tries to cleanup the old view-process and start a new one, if all is successful an [`Event::Inited`] is send.
403    ///
404    /// The old view-process exit code and std output is logged using the `vp_respawn` target.
405    ///
406    /// Exits the current process with code `1` if the view-process was killed by the user. In Windows this is if
407    /// the view-process exit code is `1`. In Unix if it was killed by SIGKILL, SIGSTOP, SIGINT.
408    ///
409    /// # Panics
410    ///
411    /// If the last five respawns happened all within 500ms of the previous respawn.
412    ///
413    /// If the an error happens three times when trying to spawn the new view-process.
414    ///
415    /// If another disconnect happens during the view-process startup dialog.
416    pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
417        if vp_gen == self.generation {
418            #[cfg(not(ipc))]
419            {
420                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
421            }
422
423            #[cfg(ipc)]
424            {
425                self.respawn_impl(true)
426            }
427        } else {
428            tracing::warn!("disconnected event from previous generation ignored")
429        }
430    }
431
432    /// Reopen the view-process, causing another [`Event::Inited`].
433    ///
434    /// This is similar to [`handle_disconnect`] but the current process does not
435    /// exit depending on the view-process exit code.
436    ///
437    /// [`handle_disconnect`]: Controller::handle_disconnect
438    pub fn respawn(&mut self) {
439        #[cfg(not(ipc))]
440        {
441            tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
442        }
443
444        #[cfg(ipc)]
445        self.respawn_impl(false);
446    }
447    #[cfg(ipc)]
448    fn respawn_impl(&mut self, is_crash: bool) {
449        use zng_unit::TimeUnits;
450
451        self.view_state = ViewState::NotRunning;
452        self.is_respawn = true;
453
454        let (mut process, mut killed_by_us) = if let Some(p) = self.process.lock().take() {
455            p
456        } else {
457            if self.same_process {
458                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
459            }
460            return;
461        };
462        if is_crash {
463            tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
464        }
465
466        if is_crash {
467            let t = Instant::now();
468            if let Some(last_respawn) = self.last_respawn {
469                if t - last_respawn < Duration::from_secs(60) {
470                    self.fast_respawn_count += 1;
471                    if self.fast_respawn_count == 2 {
472                        panic!("disconnect respawn happened 2 times in less than 1 minute");
473                    }
474                } else {
475                    self.fast_respawn_count = 0;
476                }
477            }
478            self.last_respawn = Some(t);
479        } else {
480            self.last_respawn = None;
481        }
482
483        // try exit
484        if !is_crash {
485            let _ = process.kill();
486            killed_by_us = true;
487        } else if !matches!(process.try_wait(), Ok(Some(_))) {
488            // if not exited, give the process 300ms to close with the preferred exit code.
489            thread::sleep(300.ms());
490
491            if !matches!(process.try_wait(), Ok(Some(_))) {
492                // if still not exited, kill it.
493                killed_by_us = true;
494                let _ = process.kill();
495            }
496        }
497
498        let exit_status = match process.wait() {
499            Ok(c) => Some(c),
500            Err(e) => {
501                tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
502                None
503            }
504        };
505
506        // try print stdout/err and exit code.
507        if let Some(c) = exit_status {
508            tracing::info!(target: "vp_respawn", "view-process killed");
509
510            let code = c.code();
511            #[allow(unused_mut)]
512            let mut signal = None::<i32>;
513
514            if !killed_by_us {
515                // check if user killed the view-process, in this case we exit too.
516
517                #[cfg(windows)]
518                if code == Some(1) {
519                    tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
520                                        will exit app-process with the same code");
521                    zng_env::exit(1);
522                }
523
524                #[cfg(unix)]
525                if code.is_none() {
526                    use std::os::unix::process::ExitStatusExt as _;
527                    signal = c.signal();
528
529                    if let Some(sig) = signal
530                        && [2, 9, 17, 19, 23].contains(&sig)
531                    {
532                        tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
533                                            will exit app-process with code 1");
534                        zng_env::exit(1);
535                    }
536                }
537            }
538
539            if !killed_by_us {
540                let code = code.unwrap_or(0);
541                let signal = signal.unwrap_or(0);
542                tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
543            }
544
545            if ViewConfig::is_version_err(code, None) {
546                let code = code.unwrap_or(1);
547                tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
548                                        will exit app-process with code 0x{code:x}");
549                zng_env::exit(code);
550            }
551        } else {
552            tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
553        }
554
555        // recover event listener closure (in a box).
556        let on_event = match self.event_listener.take().unwrap().join() {
557            Ok(fn_) => fn_,
558            Err(p) => panic::resume_unwind(p),
559        };
560
561        // respawn
562        let mut retries = 3;
563        let (new_process, request, response, event_listener) = loop {
564            match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
565                Ok(r) => break r,
566                Err(e) => {
567                    tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
568                    retries -= 1;
569                    if retries == 0 {
570                        panic!("failed to respawn `view-process` after 3 retries");
571                    }
572                    tracing::info!(target: "vp_respawn", "retrying respawn");
573                }
574            }
575        };
576
577        // update connections
578        self.process = Arc::new(Mutex::new(Some((new_process.unwrap(), false))));
579        self.request_sender = request;
580        self.response_receiver = response;
581
582        let next_id = self.generation.next();
583        self.generation = next_id;
584
585        let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
586        self.event_listener = Some(ev);
587
588        if let Err(ipc::ViewChannelError::Disconnected) = self.try_init() {
589            panic!("respawn on respawn startup");
590        }
591    }
592}
593impl Drop for Controller {
594    /// Kills the View Process, unless it is running in the same process.
595    fn drop(&mut self) {
596        let _ = self.exit();
597        #[cfg(ipc)]
598        if let Some((mut process, _)) = self.process.lock().take()
599            && process.try_wait().is_err()
600        {
601            std::thread::sleep(Duration::from_secs(1));
602            if process.try_wait().is_err() {
603                tracing::error!("view-process did not exit after 1s, killing");
604                let _ = process.kill();
605                let _ = process.wait();
606            }
607        }
608    }
609}
610
611const VIEW_TIMEOUT: &str = "ZNG_VIEW_TIMEOUT";
612/// Timeout in seconds.
613pub(crate) fn view_timeout() -> u64 {
614    match std::env::var(VIEW_TIMEOUT) {
615        Ok(s) if !s.is_empty() => match s.parse::<u64>() {
616            Ok(s) => s.max(5),
617            Err(e) => {
618                tracing::error!("invalid {VIEW_TIMEOUT:?} value, {e}");
619                10
620            }
621        },
622        _ => 10,
623    }
624}