zng_view_api/
app_process.rs

1use std::{
2    collections::HashMap,
3    panic,
4    path::{Path, PathBuf},
5    sync::Arc,
6    thread::{self, JoinHandle},
7    time::Instant,
8};
9
10use std::time::Duration;
11
12use parking_lot::Mutex;
13use zng_txt::Txt;
14
15use crate::{
16    AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
17    ipc::{self, EventReceiver},
18};
19
20/// The listener returns the closure on join for reuse in respawn.
21type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
22
23pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
24pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
25pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
26
27#[derive(Clone, Copy)]
28enum ViewState {
29    NotRunning,
30    RunningAndConnected,
31    Suspended,
32}
33
34/// View Process controller, used in the App Process.
35///
36/// # Exit
37///
38/// The View Process is [killed] when the controller is dropped, if the app is running in same process mode
39/// then the current process [exits] with code 0 on drop.
40///
41/// In multi-process mode the View Process is also killed to respawn if it does not send any event after 30 seconds,
42/// the app must call [`Controller::ping`] periodically to generate the [`Event::Pong`] to detect availability.
43///
44/// [killed]: std::process::Child::kill
45/// [exits]: std::process::exit
46#[cfg_attr(not(ipc), allow(unused))]
47pub struct Controller {
48    process: Arc<Mutex<Option<std::process::Child>>>,
49    view_state: ViewState,
50    generation: ViewProcessGen,
51    is_respawn: bool,
52    view_process_exe: PathBuf,
53    view_process_env: HashMap<Txt, Txt>,
54    request_sender: ipc::RequestSender,
55    response_receiver: ipc::ResponseReceiver,
56    event_listener: Option<EventListenerJoin>,
57    headless: bool,
58    same_process: bool,
59    last_respawn: Option<Instant>,
60    fast_respawn_count: u8,
61}
62#[cfg(test)]
63fn _assert_sync(x: Controller) -> impl Send + Sync {
64    x
65}
66impl Controller {
67    /// Start with a custom view process.
68    ///
69    /// The `view_process_exe` must be an executable that starts a view server.
70    /// Note that the [`VERSION`] of this crate must match in both executables.
71    ///
72    /// The `view_process_env` can be set to any env var needed to start the view-process. Note that if `view_process_exe`
73    /// is the current executable this most likely need set `zng_env::PROCESS_MAIN`.
74    ///
75    /// The `on_event` closure is called in another thread every time the app receives an event.
76    ///
77    /// # Tests
78    ///
79    /// The [`current_exe`] cannot be used in tests, you should set an external view-process executable. Unfortunately there
80    /// is no way to check if `start` was called in a test so we cannot provide an error message for this.
81    /// If the test is hanging in debug builds or has a timeout error in release builds this is probably the reason.
82    ///
83    /// [`current_exe`]: std::env::current_exe
84    /// [`VERSION`]: crate::VERSION
85    pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
86    where
87        F: FnMut(Event) + Send + 'static,
88    {
89        Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
90    }
91    fn start_impl(
92        view_process_exe: PathBuf,
93        view_process_env: HashMap<Txt, Txt>,
94        headless: bool,
95        on_event: Box<dyn FnMut(Event) + Send>,
96    ) -> Self {
97        if ViewConfig::from_env().is_some() {
98            panic!("cannot start Controller in process configured to be view-process");
99        }
100
101        let (process, request_sender, response_receiver, event_receiver) =
102            Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
103        let same_process = process.is_none();
104        let process = Arc::new(Mutex::new(process));
105        let ev = if same_process {
106            Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
107        } else {
108            Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
109        };
110
111        let mut c = Controller {
112            same_process,
113            view_state: ViewState::NotRunning,
114            process,
115            view_process_exe,
116            view_process_env,
117            request_sender,
118            response_receiver,
119            event_listener: Some(ev),
120            headless,
121            generation: ViewProcessGen::first(),
122            is_respawn: false,
123            last_respawn: None,
124            fast_respawn_count: 0,
125        };
126
127        if let Err(ipc::ViewChannelError::Disconnected) = c.try_init() {
128            panic!("respawn on init");
129        }
130
131        c
132    }
133    fn spawn_same_process_listener(
134        mut on_event: Box<dyn FnMut(Event) + Send>,
135        mut event_receiver: EventReceiver,
136        generation: ViewProcessGen,
137    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
138        thread::spawn(move || {
139            while let Ok(ev) = event_receiver.recv() {
140                on_event(ev);
141            }
142            on_event(Event::Disconnected(generation));
143
144            // return to reuse in respawn.
145            on_event
146        })
147    }
148    fn spawn_other_process_listener(
149        mut on_event: Box<dyn FnMut(Event) + Send>,
150        mut event_receiver: EventReceiver,
151        process: Arc<Mutex<Option<std::process::Child>>>,
152        generation: ViewProcessGen,
153    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
154        // spawns a thread that receives view-process events and monitors for process responsiveness
155        // - ipc-channel sometimes does not signal disconnect when the view-process dies, this monitors the process state every second.
156        // - app-process pings every 10s of inactivity, this kills the view-process it it does not respond for more them 30s.
157        thread::spawn(move || {
158            const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
159            const TIMEOUT_SECS: u8 = 30;
160            let mut check_count = 0u8;
161            while let Ok(maybe) = event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
162                match maybe {
163                    Some(ev) => {
164                        check_count = 0;
165                        on_event(ev)
166                    }
167                    None => {
168                        if let Some(p) = &mut *process.lock() {
169                            match p.try_wait() {
170                                Ok(c) => {
171                                    if c.is_some() {
172                                        // view-process died
173                                        break;
174                                    } else {
175                                        check_count += 1;
176                                        if check_count == TIMEOUT_SECS {
177                                            tracing::error!("view-process not responding for {TIMEOUT_SECS}s, will respawn");
178                                            let _ = p.kill();
179                                            break;
180                                        }
181                                    }
182                                }
183                                Err(e) => {
184                                    if e.kind() != std::io::ErrorKind::Interrupted {
185                                        tracing::error!("view-process try_wait error after inactivity, {e}");
186                                        break;
187                                    }
188                                }
189                            }
190                        } else {
191                            // respawning already
192                            break;
193                        }
194                    }
195                }
196            }
197            on_event(Event::Disconnected(generation));
198
199            // return to reuse in respawn.
200            on_event
201        })
202    }
203
204    fn try_init(&mut self) -> VpResult<()> {
205        self.init(self.generation, self.is_respawn, self.headless)?;
206        Ok(())
207    }
208
209    /// View-process is running, connected and ready to respond.
210    pub fn is_connected(&self) -> bool {
211        matches!(self.view_state, ViewState::RunningAndConnected)
212    }
213
214    /// View-process generation.
215    pub fn generation(&self) -> ViewProcessGen {
216        self.generation
217    }
218
219    /// If is running in headless mode.
220    pub fn headless(&self) -> bool {
221        self.headless
222    }
223
224    /// If is running both view and app in the same process.
225    pub fn same_process(&self) -> bool {
226        self.same_process
227    }
228
229    fn disconnected_err(&self) -> Result<(), ipc::ViewChannelError> {
230        if self.is_connected() {
231            Ok(())
232        } else {
233            Err(ipc::ViewChannelError::Disconnected)
234        }
235    }
236
237    fn try_talk(&mut self, req: Request) -> ipc::IpcResult<Response> {
238        self.request_sender.send(req)?;
239        self.response_receiver.recv()
240    }
241    pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
242        debug_assert!(req.expect_response());
243
244        if req.must_be_connected() {
245            self.disconnected_err()?;
246        }
247
248        match self.try_talk(req) {
249            Ok(r) => Ok(r),
250            Err(ipc::ViewChannelError::Disconnected) => {
251                self.handle_disconnect(self.generation);
252                Err(ipc::ViewChannelError::Disconnected)
253            }
254        }
255    }
256
257    pub(crate) fn command(&mut self, req: Request) -> VpResult<()> {
258        debug_assert!(!req.expect_response());
259
260        if req.must_be_connected() {
261            self.disconnected_err()?;
262        }
263
264        match self.request_sender.send(req) {
265            Ok(_) => Ok(()),
266            Err(ipc::ViewChannelError::Disconnected) => {
267                self.handle_disconnect(self.generation);
268                Err(ipc::ViewChannelError::Disconnected)
269            }
270        }
271    }
272
273    fn spawn_view_process(
274        view_process_exe: &Path,
275        view_process_env: &HashMap<Txt, Txt>,
276        headless: bool,
277    ) -> AnyResult<(
278        Option<std::process::Child>,
279        ipc::RequestSender,
280        ipc::ResponseReceiver,
281        ipc::EventReceiver,
282    )> {
283        let _span = tracing::trace_span!("spawn_view_process").entered();
284
285        let init = ipc::AppInit::new();
286
287        // create process and spawn it, unless is running in same process mode.
288        let process = if ViewConfig::is_awaiting_same_process() {
289            ViewConfig::set_same_process(ViewConfig {
290                version: crate::VERSION.into(),
291                server_name: Txt::from_str(init.name()),
292                headless,
293            });
294            None
295        } else {
296            #[cfg(not(ipc))]
297            {
298                let _ = (view_process_exe, view_process_env);
299                panic!("expected only same_process mode with `ipc` feature disabled");
300            }
301
302            #[cfg(ipc)]
303            {
304                let mut process = std::process::Command::new(view_process_exe);
305                for (name, val) in view_process_env {
306                    process.env(name, val);
307                }
308                let process = process
309                    .env(VIEW_VERSION, crate::VERSION)
310                    .env(VIEW_SERVER, init.name())
311                    .env(VIEW_MODE, if headless { "headless" } else { "headed" })
312                    .env("RUST_BACKTRACE", "full")
313                    .spawn()?;
314                Some(process)
315            }
316        };
317
318        let (req, rsp, ev) = match init.connect() {
319            Ok(r) => r,
320            Err(e) => {
321                #[cfg(ipc)]
322                if let Some(mut p) = process {
323                    if let Err(ke) = p.kill() {
324                        tracing::error!(
325                            "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
326                        );
327                    } else {
328                        match p.wait() {
329                            Ok(output) => {
330                                let code = output.code();
331                                if ViewConfig::is_version_err(code, None) {
332                                    let code = code.unwrap_or(1);
333                                    tracing::error!(
334                                        "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
335                                                will exit app-process with code 0x{code:x}"
336                                    );
337                                    zng_env::exit(code);
338                                } else {
339                                    tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
340                                }
341                            }
342                            Err(e) => {
343                                tracing::error!("failed to read output status of killed view-process, {e}");
344                            }
345                        }
346                    }
347                } else {
348                    tracing::error!("failed to connect with same process");
349                }
350                return Err(e);
351            }
352        };
353
354        Ok((process, req, rsp, ev))
355    }
356
357    /// Handle an [`Event::Inited`].
358    ///
359    /// Set the connected flag to `true`.
360    pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
361        match self.view_state {
362            ViewState::NotRunning => {
363                if self.generation == vp_gen {
364                    // crash respawn already sets gen
365                    self.view_state = ViewState::RunningAndConnected;
366                }
367            }
368            ViewState::Suspended => {
369                self.generation = vp_gen;
370                self.view_state = ViewState::RunningAndConnected;
371            }
372            ViewState::RunningAndConnected => {}
373        }
374    }
375
376    /// Handle an [`Event::Suspended`].
377    ///
378    /// Set the connected flat to `false`.
379    pub fn handle_suspended(&mut self) {
380        self.view_state = ViewState::Suspended;
381    }
382
383    /// Handle an [`Event::Disconnected`].
384    ///
385    /// The `gen` parameter is the generation provided by the event. It is used to determinate if the disconnect has
386    /// not been handled already.
387    ///
388    /// Tries to cleanup the old view-process and start a new one, if all is successful an [`Event::Inited`] is send.
389    ///
390    /// The old view-process exit code and std output is logged using the `vp_respawn` target.
391    ///
392    /// Exits the current process with code `1` if the view-process was killed by the user. In Windows this is if
393    /// the view-process exit code is `1`. In Unix if it was killed by SIGKILL, SIGSTOP, SIGINT.
394    ///
395    /// # Panics
396    ///
397    /// If the last five respawns happened all within 500ms of the previous respawn.
398    ///
399    /// If the an error happens three times when trying to spawn the new view-process.
400    ///
401    /// If another disconnect happens during the view-process startup dialog.
402    pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
403        if vp_gen == self.generation {
404            #[cfg(not(ipc))]
405            {
406                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
407            }
408
409            #[cfg(ipc)]
410            {
411                self.respawn_impl(true)
412            }
413        } else {
414            tracing::warn!("disconnected event from previous generation ignored")
415        }
416    }
417
418    /// Reopen the view-process, causing another [`Event::Inited`].
419    ///
420    /// This is similar to [`handle_disconnect`] but the current process does not
421    /// exit depending on the view-process exit code.
422    ///
423    /// [`handle_disconnect`]: Controller::handle_disconnect
424    pub fn respawn(&mut self) {
425        #[cfg(not(ipc))]
426        {
427            tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
428        }
429
430        #[cfg(ipc)]
431        self.respawn_impl(false);
432    }
433    #[cfg(ipc)]
434    fn respawn_impl(&mut self, is_crash: bool) {
435        use zng_unit::TimeUnits;
436
437        self.view_state = ViewState::NotRunning;
438        self.is_respawn = true;
439
440        let mut process = if let Some(p) = self.process.lock().take() {
441            p
442        } else {
443            if self.same_process {
444                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
445            }
446            return;
447        };
448        if is_crash {
449            tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
450        }
451
452        if is_crash {
453            let t = Instant::now();
454            if let Some(last_respawn) = self.last_respawn {
455                if t - last_respawn < Duration::from_secs(60) {
456                    self.fast_respawn_count += 1;
457                    if self.fast_respawn_count == 2 {
458                        panic!("disconnect respawn happened 2 times in less than 1 minute");
459                    }
460                } else {
461                    self.fast_respawn_count = 0;
462                }
463            }
464            self.last_respawn = Some(t);
465        } else {
466            self.last_respawn = None;
467        }
468
469        // try exit
470        let mut killed_by_us = false;
471        if !is_crash {
472            let _ = process.kill();
473            killed_by_us = true;
474        } else if !matches!(process.try_wait(), Ok(Some(_))) {
475            // if not exited, give the process 300ms to close with the preferred exit code.
476            thread::sleep(300.ms());
477
478            if !matches!(process.try_wait(), Ok(Some(_))) {
479                // if still not exited, kill it.
480                killed_by_us = true;
481                let _ = process.kill();
482            }
483        }
484
485        let code_and_output = match process.wait() {
486            Ok(c) => Some(c),
487            Err(e) => {
488                tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
489                None
490            }
491        };
492
493        // try print stdout/err and exit code.
494        if let Some(c) = code_and_output {
495            tracing::info!(target: "vp_respawn", "view-process killed");
496
497            let code = c.code();
498            #[allow(unused_mut)]
499            let mut signal = None::<i32>;
500
501            if !killed_by_us {
502                // check if user killed the view-process, in this case we exit too.
503
504                #[cfg(windows)]
505                if code == Some(1) {
506                    tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
507                                        will exit app-process with the same code");
508                    zng_env::exit(1);
509                }
510
511                #[cfg(unix)]
512                if code.is_none() {
513                    use std::os::unix::process::ExitStatusExt as _;
514                    signal = c.signal();
515
516                    if let Some(sig) = signal
517                        && [2, 9, 17, 19, 23].contains(&sig)
518                    {
519                        tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
520                                            will exit app-process with code 1");
521                        zng_env::exit(1);
522                    }
523                }
524            }
525
526            if !killed_by_us {
527                let code = code.unwrap_or(0);
528                let signal = signal.unwrap_or(0);
529                tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
530            }
531
532            if ViewConfig::is_version_err(code, None) {
533                let code = code.unwrap_or(1);
534                tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
535                                        will exit app-process with code 0x{code:x}");
536                zng_env::exit(code);
537            }
538        } else {
539            tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
540        }
541
542        // recover event listener closure (in a box).
543        let on_event = match self.event_listener.take().unwrap().join() {
544            Ok(fn_) => fn_,
545            Err(p) => panic::resume_unwind(p),
546        };
547
548        // respawn
549        let mut retries = 3;
550        let (new_process, request, response, event_listener) = loop {
551            match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
552                Ok(r) => break r,
553                Err(e) => {
554                    tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
555                    retries -= 1;
556                    if retries == 0 {
557                        panic!("failed to respawn `view-process` after 3 retries");
558                    }
559                    tracing::info!(target: "vp_respawn", "retrying respawn");
560                }
561            }
562        };
563        debug_assert!(new_process.is_some());
564
565        // update connections
566        self.process = Arc::new(Mutex::new(new_process));
567        self.request_sender = request;
568        self.response_receiver = response;
569
570        let next_id = self.generation.next();
571        self.generation = next_id;
572
573        let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
574        self.event_listener = Some(ev);
575
576        if let Err(ipc::ViewChannelError::Disconnected) = self.try_init() {
577            panic!("respawn on respawn startup");
578        }
579    }
580}
581impl Drop for Controller {
582    /// Kills the View Process, unless it is running in the same process.
583    fn drop(&mut self) {
584        let _ = self.exit();
585        #[cfg(ipc)]
586        if let Some(mut process) = self.process.lock().take()
587            && process.try_wait().is_err()
588        {
589            std::thread::sleep(Duration::from_secs(1));
590            if process.try_wait().is_err() {
591                tracing::error!("view-process did not exit after 1s, killing");
592                let _ = process.kill();
593                let _ = process.wait();
594            }
595        }
596    }
597}