1use std::{
2 collections::HashMap,
3 panic,
4 path::{Path, PathBuf},
5 sync::Arc,
6 thread::{self, JoinHandle},
7 time::Instant,
8};
9
10use std::time::Duration;
11
12use parking_lot::Mutex;
13use zng_txt::Txt;
14
15use crate::{
16 AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
17 ipc::{self, EventReceiver},
18};
19
20type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
22
23pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
24pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
25pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
26
27#[derive(Clone, Copy)]
28enum ViewState {
29 NotRunning,
30 RunningAndConnected,
31 Suspended,
32}
33
34#[cfg_attr(not(ipc), allow(unused))]
47pub struct Controller {
48 process: Arc<Mutex<Option<std::process::Child>>>,
49 view_state: ViewState,
50 generation: ViewProcessGen,
51 is_respawn: bool,
52 view_process_exe: PathBuf,
53 view_process_env: HashMap<Txt, Txt>,
54 request_sender: ipc::RequestSender,
55 response_receiver: ipc::ResponseReceiver,
56 event_listener: Option<EventListenerJoin>,
57 headless: bool,
58 same_process: bool,
59 last_respawn: Option<Instant>,
60 fast_respawn_count: u8,
61}
62#[cfg(test)]
63fn _assert_sync(x: Controller) -> impl Send + Sync {
64 x
65}
66impl Controller {
67 pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
86 where
87 F: FnMut(Event) + Send + 'static,
88 {
89 Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
90 }
91 fn start_impl(
92 view_process_exe: PathBuf,
93 view_process_env: HashMap<Txt, Txt>,
94 headless: bool,
95 on_event: Box<dyn FnMut(Event) + Send>,
96 ) -> Self {
97 if ViewConfig::from_env().is_some() {
98 panic!("cannot start Controller in process configured to be view-process");
99 }
100
101 let (process, request_sender, response_receiver, event_receiver) =
102 Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
103 let same_process = process.is_none();
104 let process = Arc::new(Mutex::new(process));
105 let ev = if same_process {
106 Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
107 } else {
108 Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
109 };
110
111 let mut c = Controller {
112 same_process,
113 view_state: ViewState::NotRunning,
114 process,
115 view_process_exe,
116 view_process_env,
117 request_sender,
118 response_receiver,
119 event_listener: Some(ev),
120 headless,
121 generation: ViewProcessGen::first(),
122 is_respawn: false,
123 last_respawn: None,
124 fast_respawn_count: 0,
125 };
126
127 if let Err(ipc::ViewChannelError::Disconnected) = c.try_init() {
128 panic!("respawn on init");
129 }
130
131 c
132 }
133 fn spawn_same_process_listener(
134 mut on_event: Box<dyn FnMut(Event) + Send>,
135 mut event_receiver: EventReceiver,
136 generation: ViewProcessGen,
137 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
138 thread::spawn(move || {
139 while let Ok(ev) = event_receiver.recv() {
140 on_event(ev);
141 }
142 on_event(Event::Disconnected(generation));
143
144 on_event
146 })
147 }
148 fn spawn_other_process_listener(
149 mut on_event: Box<dyn FnMut(Event) + Send>,
150 mut event_receiver: EventReceiver,
151 process: Arc<Mutex<Option<std::process::Child>>>,
152 generation: ViewProcessGen,
153 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
154 thread::spawn(move || {
158 const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
159 const TIMEOUT_SECS: u8 = 30;
160 let mut check_count = 0u8;
161 while let Ok(maybe) = event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
162 match maybe {
163 Some(ev) => {
164 check_count = 0;
165 on_event(ev)
166 }
167 None => {
168 if let Some(p) = &mut *process.lock() {
169 match p.try_wait() {
170 Ok(c) => {
171 if c.is_some() {
172 break;
174 } else {
175 check_count += 1;
176 if check_count == TIMEOUT_SECS {
177 tracing::error!("view-process not responding for {TIMEOUT_SECS}s, will respawn");
178 let _ = p.kill();
179 break;
180 }
181 }
182 }
183 Err(e) => {
184 if e.kind() != std::io::ErrorKind::Interrupted {
185 tracing::error!("view-process try_wait error after inactivity, {e}");
186 break;
187 }
188 }
189 }
190 } else {
191 break;
193 }
194 }
195 }
196 }
197 on_event(Event::Disconnected(generation));
198
199 on_event
201 })
202 }
203
204 fn try_init(&mut self) -> VpResult<()> {
205 self.init(self.generation, self.is_respawn, self.headless)?;
206 Ok(())
207 }
208
209 pub fn is_connected(&self) -> bool {
211 matches!(self.view_state, ViewState::RunningAndConnected)
212 }
213
214 pub fn generation(&self) -> ViewProcessGen {
216 self.generation
217 }
218
219 pub fn headless(&self) -> bool {
221 self.headless
222 }
223
224 pub fn same_process(&self) -> bool {
226 self.same_process
227 }
228
229 fn disconnected_err(&self) -> Result<(), ipc::ViewChannelError> {
230 if self.is_connected() {
231 Ok(())
232 } else {
233 Err(ipc::ViewChannelError::Disconnected)
234 }
235 }
236
237 fn try_talk(&mut self, req: Request) -> ipc::IpcResult<Response> {
238 self.request_sender.send(req)?;
239 self.response_receiver.recv()
240 }
241 pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
242 debug_assert!(req.expect_response());
243
244 if req.must_be_connected() {
245 self.disconnected_err()?;
246 }
247
248 match self.try_talk(req) {
249 Ok(r) => Ok(r),
250 Err(ipc::ViewChannelError::Disconnected) => {
251 self.handle_disconnect(self.generation);
252 Err(ipc::ViewChannelError::Disconnected)
253 }
254 }
255 }
256
257 pub(crate) fn command(&mut self, req: Request) -> VpResult<()> {
258 debug_assert!(!req.expect_response());
259
260 if req.must_be_connected() {
261 self.disconnected_err()?;
262 }
263
264 match self.request_sender.send(req) {
265 Ok(_) => Ok(()),
266 Err(ipc::ViewChannelError::Disconnected) => {
267 self.handle_disconnect(self.generation);
268 Err(ipc::ViewChannelError::Disconnected)
269 }
270 }
271 }
272
273 fn spawn_view_process(
274 view_process_exe: &Path,
275 view_process_env: &HashMap<Txt, Txt>,
276 headless: bool,
277 ) -> AnyResult<(
278 Option<std::process::Child>,
279 ipc::RequestSender,
280 ipc::ResponseReceiver,
281 ipc::EventReceiver,
282 )> {
283 let _span = tracing::trace_span!("spawn_view_process").entered();
284
285 let init = ipc::AppInit::new();
286
287 let process = if ViewConfig::is_awaiting_same_process() {
289 ViewConfig::set_same_process(ViewConfig {
290 version: crate::VERSION.into(),
291 server_name: Txt::from_str(init.name()),
292 headless,
293 });
294 None
295 } else {
296 #[cfg(not(ipc))]
297 {
298 let _ = (view_process_exe, view_process_env);
299 panic!("expected only same_process mode with `ipc` feature disabled");
300 }
301
302 #[cfg(ipc)]
303 {
304 let mut process = std::process::Command::new(view_process_exe);
305 for (name, val) in view_process_env {
306 process.env(name, val);
307 }
308 let process = process
309 .env(VIEW_VERSION, crate::VERSION)
310 .env(VIEW_SERVER, init.name())
311 .env(VIEW_MODE, if headless { "headless" } else { "headed" })
312 .env("RUST_BACKTRACE", "full")
313 .spawn()?;
314 Some(process)
315 }
316 };
317
318 let (req, rsp, ev) = match init.connect() {
319 Ok(r) => r,
320 Err(e) => {
321 #[cfg(ipc)]
322 if let Some(mut p) = process {
323 if let Err(ke) = p.kill() {
324 tracing::error!(
325 "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
326 );
327 } else {
328 match p.wait() {
329 Ok(output) => {
330 let code = output.code();
331 if ViewConfig::is_version_err(code, None) {
332 let code = code.unwrap_or(1);
333 tracing::error!(
334 "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
335 will exit app-process with code 0x{code:x}"
336 );
337 zng_env::exit(code);
338 } else {
339 tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
340 }
341 }
342 Err(e) => {
343 tracing::error!("failed to read output status of killed view-process, {e}");
344 }
345 }
346 }
347 } else {
348 tracing::error!("failed to connect with same process");
349 }
350 return Err(e);
351 }
352 };
353
354 Ok((process, req, rsp, ev))
355 }
356
357 pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
361 match self.view_state {
362 ViewState::NotRunning => {
363 if self.generation == vp_gen {
364 self.view_state = ViewState::RunningAndConnected;
366 }
367 }
368 ViewState::Suspended => {
369 self.generation = vp_gen;
370 self.view_state = ViewState::RunningAndConnected;
371 }
372 ViewState::RunningAndConnected => {}
373 }
374 }
375
376 pub fn handle_suspended(&mut self) {
380 self.view_state = ViewState::Suspended;
381 }
382
383 pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
403 if vp_gen == self.generation {
404 #[cfg(not(ipc))]
405 {
406 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
407 }
408
409 #[cfg(ipc)]
410 {
411 self.respawn_impl(true)
412 }
413 } else {
414 tracing::warn!("disconnected event from previous generation ignored")
415 }
416 }
417
418 pub fn respawn(&mut self) {
425 #[cfg(not(ipc))]
426 {
427 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
428 }
429
430 #[cfg(ipc)]
431 self.respawn_impl(false);
432 }
433 #[cfg(ipc)]
434 fn respawn_impl(&mut self, is_crash: bool) {
435 use zng_unit::TimeUnits;
436
437 self.view_state = ViewState::NotRunning;
438 self.is_respawn = true;
439
440 let mut process = if let Some(p) = self.process.lock().take() {
441 p
442 } else {
443 if self.same_process {
444 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
445 }
446 return;
447 };
448 if is_crash {
449 tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
450 }
451
452 if is_crash {
453 let t = Instant::now();
454 if let Some(last_respawn) = self.last_respawn {
455 if t - last_respawn < Duration::from_secs(60) {
456 self.fast_respawn_count += 1;
457 if self.fast_respawn_count == 2 {
458 panic!("disconnect respawn happened 2 times in less than 1 minute");
459 }
460 } else {
461 self.fast_respawn_count = 0;
462 }
463 }
464 self.last_respawn = Some(t);
465 } else {
466 self.last_respawn = None;
467 }
468
469 let mut killed_by_us = false;
471 if !is_crash {
472 let _ = process.kill();
473 killed_by_us = true;
474 } else if !matches!(process.try_wait(), Ok(Some(_))) {
475 thread::sleep(300.ms());
477
478 if !matches!(process.try_wait(), Ok(Some(_))) {
479 killed_by_us = true;
481 let _ = process.kill();
482 }
483 }
484
485 let code_and_output = match process.wait() {
486 Ok(c) => Some(c),
487 Err(e) => {
488 tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
489 None
490 }
491 };
492
493 if let Some(c) = code_and_output {
495 tracing::info!(target: "vp_respawn", "view-process killed");
496
497 let code = c.code();
498 #[allow(unused_mut)]
499 let mut signal = None::<i32>;
500
501 if !killed_by_us {
502 #[cfg(windows)]
505 if code == Some(1) {
506 tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
507 will exit app-process with the same code");
508 zng_env::exit(1);
509 }
510
511 #[cfg(unix)]
512 if code.is_none() {
513 use std::os::unix::process::ExitStatusExt as _;
514 signal = c.signal();
515
516 if let Some(sig) = signal
517 && [2, 9, 17, 19, 23].contains(&sig)
518 {
519 tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
520 will exit app-process with code 1");
521 zng_env::exit(1);
522 }
523 }
524 }
525
526 if !killed_by_us {
527 let code = code.unwrap_or(0);
528 let signal = signal.unwrap_or(0);
529 tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
530 }
531
532 if ViewConfig::is_version_err(code, None) {
533 let code = code.unwrap_or(1);
534 tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
535 will exit app-process with code 0x{code:x}");
536 zng_env::exit(code);
537 }
538 } else {
539 tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
540 }
541
542 let on_event = match self.event_listener.take().unwrap().join() {
544 Ok(fn_) => fn_,
545 Err(p) => panic::resume_unwind(p),
546 };
547
548 let mut retries = 3;
550 let (new_process, request, response, event_listener) = loop {
551 match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
552 Ok(r) => break r,
553 Err(e) => {
554 tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
555 retries -= 1;
556 if retries == 0 {
557 panic!("failed to respawn `view-process` after 3 retries");
558 }
559 tracing::info!(target: "vp_respawn", "retrying respawn");
560 }
561 }
562 };
563 debug_assert!(new_process.is_some());
564
565 self.process = Arc::new(Mutex::new(new_process));
567 self.request_sender = request;
568 self.response_receiver = response;
569
570 let next_id = self.generation.next();
571 self.generation = next_id;
572
573 let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
574 self.event_listener = Some(ev);
575
576 if let Err(ipc::ViewChannelError::Disconnected) = self.try_init() {
577 panic!("respawn on respawn startup");
578 }
579 }
580}
581impl Drop for Controller {
582 fn drop(&mut self) {
584 let _ = self.exit();
585 #[cfg(ipc)]
586 if let Some(mut process) = self.process.lock().take()
587 && process.try_wait().is_err()
588 {
589 std::thread::sleep(Duration::from_secs(1));
590 if process.try_wait().is_err() {
591 tracing::error!("view-process did not exit after 1s, killing");
592 let _ = process.kill();
593 let _ = process.wait();
594 }
595 }
596 }
597}