1use std::{
2 collections::HashMap,
3 panic,
4 path::{Path, PathBuf},
5 sync::Arc,
6 thread::{self, JoinHandle},
7 time::Instant,
8};
9
10use std::time::Duration;
11
12use parking_lot::Mutex;
13use zng_txt::Txt;
14
15use crate::{
16 AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
17 ipc::{self, EventReceiver},
18};
19
20type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
22
23pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
24pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
25pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
26
27#[derive(Clone, Copy)]
28enum ViewState {
29 NotRunning,
30 RunningAndConnected,
31 Suspended,
32}
33
34#[cfg_attr(not(ipc), allow(unused))]
47pub struct Controller {
48 process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
49 view_state: ViewState,
50 generation: ViewProcessGen,
51 is_respawn: bool,
52 view_process_exe: PathBuf,
53 view_process_env: HashMap<Txt, Txt>,
54 request_sender: ipc::RequestSender,
55 response_receiver: ipc::ResponseReceiver,
56 event_listener: Option<EventListenerJoin>,
57 headless: bool,
58 same_process: bool,
59 last_respawn: Option<Instant>,
60 fast_respawn_count: u8,
61}
62#[cfg(test)]
63fn _assert_sync(x: Controller) -> impl Send + Sync {
64 x
65}
66impl Controller {
67 pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
93 where
94 F: FnMut(Event) + Send + 'static,
95 {
96 Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
97 }
98 fn start_impl(
99 view_process_exe: PathBuf,
100 view_process_env: HashMap<Txt, Txt>,
101 headless: bool,
102 on_event: Box<dyn FnMut(Event) + Send>,
103 ) -> Self {
104 if ViewConfig::from_env().is_some() {
105 panic!("cannot start Controller in process configured to be view-process");
106 }
107
108 let (process, request_sender, response_receiver, event_receiver) =
109 Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
110 let same_process = process.is_none();
111 let process = Arc::new(Mutex::new(process.map(|p| (p, false))));
112 let ev = if same_process {
113 Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
114 } else {
115 Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
116 };
117
118 let mut c = Controller {
119 same_process,
120 view_state: ViewState::NotRunning,
121 process,
122 view_process_exe,
123 view_process_env,
124 request_sender,
125 response_receiver,
126 event_listener: Some(ev),
127 headless,
128 generation: ViewProcessGen::first(),
129 is_respawn: false,
130 last_respawn: None,
131 fast_respawn_count: 0,
132 };
133
134 if let Err(ipc::ViewChannelError::Disconnected) = c.try_init() {
135 panic!("respawn on init");
136 }
137
138 c
139 }
140 fn spawn_same_process_listener(
141 mut on_event: Box<dyn FnMut(Event) + Send>,
142 mut event_receiver: EventReceiver,
143 generation: ViewProcessGen,
144 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
145 thread::Builder::new()
146 .name("same_process_listener".into())
147 .spawn(move || {
148 while let Ok(ev) = event_receiver.recv() {
149 on_event(ev);
150 }
151 on_event(Event::Disconnected(generation));
152
153 on_event
155 })
156 .expect("failed to spawn thread")
157 }
158 fn spawn_other_process_listener(
159 mut on_event: Box<dyn FnMut(Event) + Send>,
160 mut event_receiver: EventReceiver,
161 process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
162 generation: ViewProcessGen,
163 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
164 thread::Builder::new()
168 .name("other_process_listener".into())
169 .spawn(move || {
170 const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
171 let timeout = view_timeout();
172 let mut check_count = 0u64;
173 while let Ok(maybe) = event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
174 match maybe {
175 Some(ev) => {
176 check_count = 0;
177 on_event(ev)
178 }
179 None => {
180 if let Some(p) = &mut *process.lock() {
181 match p.0.try_wait() {
182 Ok(c) => {
183 if c.is_some() {
184 break;
186 } else {
187 check_count += 1;
188 if check_count == timeout {
189 tracing::error!("view-process not responding for {timeout}s, will respawn");
190 let _ = p.0.kill();
191 p.1 = true;
192 break;
193 }
194 }
195 }
196 Err(e) => {
197 if e.kind() != std::io::ErrorKind::Interrupted {
198 tracing::error!("view-process try_wait error after inactivity, {e}");
199 break;
200 }
201 }
202 }
203 } else {
204 break;
206 }
207 }
208 }
209 }
210 on_event(Event::Disconnected(generation));
211
212 on_event
214 })
215 .expect("failed to spawn thread")
216 }
217
218 fn try_init(&mut self) -> VpResult<()> {
219 self.init(self.generation, self.is_respawn, self.headless)?;
220 Ok(())
221 }
222
223 pub fn is_connected(&self) -> bool {
225 matches!(self.view_state, ViewState::RunningAndConnected)
226 }
227
228 pub fn generation(&self) -> ViewProcessGen {
230 self.generation
231 }
232
233 pub fn headless(&self) -> bool {
235 self.headless
236 }
237
238 pub fn same_process(&self) -> bool {
240 self.same_process
241 }
242
243 fn disconnected_err(&self) -> Result<(), ipc::ViewChannelError> {
244 if self.is_connected() {
245 Ok(())
246 } else {
247 Err(ipc::ViewChannelError::Disconnected)
248 }
249 }
250
251 fn try_talk(&mut self, req: Request) -> ipc::IpcResult<Response> {
252 self.request_sender.send(req)?;
253 self.response_receiver.recv()
254 }
255 pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
256 debug_assert!(req.expect_response());
257
258 if req.must_be_connected() {
259 self.disconnected_err()?;
260 }
261
262 match self.try_talk(req) {
263 Ok(r) => Ok(r),
264 Err(ipc::ViewChannelError::Disconnected) => {
265 self.handle_disconnect(self.generation);
266 Err(ipc::ViewChannelError::Disconnected)
267 }
268 }
269 }
270
271 pub(crate) fn command(&mut self, req: Request) -> VpResult<()> {
272 debug_assert!(!req.expect_response());
273
274 if req.must_be_connected() {
275 self.disconnected_err()?;
276 }
277
278 match self.request_sender.send(req) {
279 Ok(_) => Ok(()),
280 Err(ipc::ViewChannelError::Disconnected) => {
281 self.handle_disconnect(self.generation);
282 Err(ipc::ViewChannelError::Disconnected)
283 }
284 }
285 }
286
287 fn spawn_view_process(
288 view_process_exe: &Path,
289 view_process_env: &HashMap<Txt, Txt>,
290 headless: bool,
291 ) -> AnyResult<(
292 Option<std::process::Child>,
293 ipc::RequestSender,
294 ipc::ResponseReceiver,
295 ipc::EventReceiver,
296 )> {
297 let _span = tracing::trace_span!("spawn_view_process").entered();
298
299 let init = ipc::AppInit::new();
300
301 let process = if ViewConfig::is_awaiting_same_process() {
303 ViewConfig::set_same_process(ViewConfig {
304 version: crate::VERSION.into(),
305 server_name: Txt::from_str(init.name()),
306 headless,
307 });
308 None
309 } else {
310 #[cfg(not(ipc))]
311 {
312 let _ = (view_process_exe, view_process_env);
313 panic!("expected only same_process mode with `ipc` feature disabled");
314 }
315
316 #[cfg(ipc)]
317 {
318 let mut process = std::process::Command::new(view_process_exe);
319 for (name, val) in view_process_env {
320 process.env(name, val);
321 }
322 let process = process
323 .env(VIEW_VERSION, crate::VERSION)
324 .env(VIEW_SERVER, init.name())
325 .env(VIEW_MODE, if headless { "headless" } else { "headed" })
326 .env("RUST_BACKTRACE", "full")
327 .spawn()?;
328 Some(process)
329 }
330 };
331
332 let (req, rsp, ev) = match init.connect() {
333 Ok(r) => r,
334 Err(e) => {
335 #[cfg(ipc)]
336 if let Some(mut p) = process {
337 if let Err(ke) = p.kill() {
338 tracing::error!(
339 "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
340 );
341 } else {
342 match p.wait() {
343 Ok(output) => {
344 let code = output.code();
345 if ViewConfig::is_version_err(code, None) {
346 let code = code.unwrap_or(1);
347 tracing::error!(
348 "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
349 will exit app-process with code 0x{code:x}"
350 );
351 zng_env::exit(code);
352 } else {
353 tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
354 }
355 }
356 Err(e) => {
357 tracing::error!("failed to read output status of killed view-process, {e}");
358 }
359 }
360 }
361 } else {
362 tracing::error!("failed to connect with same process");
363 }
364 return Err(e);
365 }
366 };
367
368 Ok((process, req, rsp, ev))
369 }
370
371 pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
375 match self.view_state {
376 ViewState::NotRunning => {
377 if self.generation == vp_gen {
378 self.view_state = ViewState::RunningAndConnected;
380 }
381 }
382 ViewState::Suspended => {
383 self.generation = vp_gen;
384 self.view_state = ViewState::RunningAndConnected;
385 }
386 ViewState::RunningAndConnected => {}
387 }
388 }
389
390 pub fn handle_suspended(&mut self) {
394 self.view_state = ViewState::Suspended;
395 }
396
397 pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
417 if vp_gen == self.generation {
418 #[cfg(not(ipc))]
419 {
420 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
421 }
422
423 #[cfg(ipc)]
424 {
425 self.respawn_impl(true)
426 }
427 } else {
428 tracing::warn!("disconnected event from previous generation ignored")
429 }
430 }
431
432 pub fn respawn(&mut self) {
439 #[cfg(not(ipc))]
440 {
441 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
442 }
443
444 #[cfg(ipc)]
445 self.respawn_impl(false);
446 }
447 #[cfg(ipc)]
448 fn respawn_impl(&mut self, is_crash: bool) {
449 use zng_unit::TimeUnits;
450
451 self.view_state = ViewState::NotRunning;
452 self.is_respawn = true;
453
454 let (mut process, mut killed_by_us) = if let Some(p) = self.process.lock().take() {
455 p
456 } else {
457 if self.same_process {
458 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
459 }
460 return;
461 };
462 if is_crash {
463 tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
464 }
465
466 if is_crash {
467 let t = Instant::now();
468 if let Some(last_respawn) = self.last_respawn {
469 if t - last_respawn < Duration::from_secs(60) {
470 self.fast_respawn_count += 1;
471 if self.fast_respawn_count == 2 {
472 panic!("disconnect respawn happened 2 times in less than 1 minute");
473 }
474 } else {
475 self.fast_respawn_count = 0;
476 }
477 }
478 self.last_respawn = Some(t);
479 } else {
480 self.last_respawn = None;
481 }
482
483 if !is_crash {
485 let _ = process.kill();
486 killed_by_us = true;
487 } else if !matches!(process.try_wait(), Ok(Some(_))) {
488 thread::sleep(300.ms());
490
491 if !matches!(process.try_wait(), Ok(Some(_))) {
492 killed_by_us = true;
494 let _ = process.kill();
495 }
496 }
497
498 let exit_status = match process.wait() {
499 Ok(c) => Some(c),
500 Err(e) => {
501 tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
502 None
503 }
504 };
505
506 if let Some(c) = exit_status {
508 tracing::info!(target: "vp_respawn", "view-process killed");
509
510 let code = c.code();
511 #[allow(unused_mut)]
512 let mut signal = None::<i32>;
513
514 if !killed_by_us {
515 #[cfg(windows)]
518 if code == Some(1) {
519 tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
520 will exit app-process with the same code");
521 zng_env::exit(1);
522 }
523
524 #[cfg(unix)]
525 if code.is_none() {
526 use std::os::unix::process::ExitStatusExt as _;
527 signal = c.signal();
528
529 if let Some(sig) = signal
530 && [2, 9, 17, 19, 23].contains(&sig)
531 {
532 tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
533 will exit app-process with code 1");
534 zng_env::exit(1);
535 }
536 }
537 }
538
539 if !killed_by_us {
540 let code = code.unwrap_or(0);
541 let signal = signal.unwrap_or(0);
542 tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
543 }
544
545 if ViewConfig::is_version_err(code, None) {
546 let code = code.unwrap_or(1);
547 tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
548 will exit app-process with code 0x{code:x}");
549 zng_env::exit(code);
550 }
551 } else {
552 tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
553 }
554
555 let on_event = match self.event_listener.take().unwrap().join() {
557 Ok(fn_) => fn_,
558 Err(p) => panic::resume_unwind(p),
559 };
560
561 let mut retries = 3;
563 let (new_process, request, response, event_listener) = loop {
564 match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
565 Ok(r) => break r,
566 Err(e) => {
567 tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
568 retries -= 1;
569 if retries == 0 {
570 panic!("failed to respawn `view-process` after 3 retries");
571 }
572 tracing::info!(target: "vp_respawn", "retrying respawn");
573 }
574 }
575 };
576
577 self.process = Arc::new(Mutex::new(Some((new_process.unwrap(), false))));
579 self.request_sender = request;
580 self.response_receiver = response;
581
582 let next_id = self.generation.next();
583 self.generation = next_id;
584
585 let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
586 self.event_listener = Some(ev);
587
588 if let Err(ipc::ViewChannelError::Disconnected) = self.try_init() {
589 panic!("respawn on respawn startup");
590 }
591 }
592}
593impl Drop for Controller {
594 fn drop(&mut self) {
596 let _ = self.exit();
597 #[cfg(ipc)]
598 if let Some((mut process, _)) = self.process.lock().take()
599 && process.try_wait().is_err()
600 {
601 std::thread::sleep(Duration::from_secs(1));
602 if process.try_wait().is_err() {
603 tracing::error!("view-process did not exit after 1s, killing");
604 let _ = process.kill();
605 let _ = process.wait();
606 }
607 }
608 }
609}
610
611const VIEW_TIMEOUT: &str = "ZNG_VIEW_TIMEOUT";
612pub(crate) fn view_timeout() -> u64 {
614 match std::env::var(VIEW_TIMEOUT) {
615 Ok(s) if !s.is_empty() => match s.parse::<u64>() {
616 Ok(s) => s.max(5),
617 Err(e) => {
618 tracing::error!("invalid {VIEW_TIMEOUT:?} value, {e}");
619 10
620 }
621 },
622 _ => 10,
623 }
624}