1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4 collections::HashMap,
5 panic,
6 path::{Path, PathBuf},
7 sync::Arc,
8 thread::{self, JoinHandle},
9 time::Instant,
10};
11
12use std::time::Duration;
13
14use zng_task::channel::ChannelError;
15use zng_task::parking_lot::Mutex;
16use zng_txt::Txt;
17
18use crate::{
19 AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
20 ipc::{self, EventReceiver},
21};
22
23type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
25
26pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
27pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
28pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
29
30#[derive(Clone, Copy)]
31enum ViewState {
32 NotRunning,
33 RunningAndConnected,
34 Suspended,
35}
36
37#[cfg(ipc)]
38use zng_task::process::tap::StderrTap;
39#[cfg(not(ipc))]
40struct StderrTap;
41
42pub struct Controller {
55 process: Arc<Mutex<Option<(std::process::Child, StderrTap, bool)>>>,
56 view_state: ViewState,
57 generation: ViewProcessGen,
58 is_respawn: bool,
59 view_process_exe: PathBuf,
60 view_process_env: HashMap<Txt, Txt>,
61 request_sender: ipc::RequestSender,
62 response_receiver: ipc::ResponseReceiver,
63 event_listener: Option<EventListenerJoin>,
64 headless: bool,
65 same_process: bool,
66 last_respawn: Option<Instant>,
67 fast_respawn_count: u8,
68}
69#[cfg(test)]
70fn _assert_sync(x: Controller) -> impl Send + Sync {
71 x
72}
73impl Controller {
74 pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
100 where
101 F: FnMut(Event) + Send + 'static,
102 {
103 Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
104 }
105 fn start_impl(
106 view_process_exe: PathBuf,
107 view_process_env: HashMap<Txt, Txt>,
108 headless: bool,
109 on_event: Box<dyn FnMut(Event) + Send>,
110 ) -> Self {
111 if ViewConfig::from_env().is_some() {
112 panic!("cannot start Controller in process configured to be view-process");
113 }
114
115 let (process, request_sender, response_receiver, event_receiver) =
116 Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
117 let same_process = process.is_none();
118 let process = Arc::new(Mutex::new(process.map(|(p, e)| (p, e, false))));
119 let ev = if same_process {
120 Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
121 } else {
122 #[cfg(ipc)]
123 {
124 Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
125 }
126 #[cfg(not(ipc))]
127 {
128 unreachable!()
129 }
130 };
131
132 let mut c = Controller {
133 same_process,
134 view_state: ViewState::NotRunning,
135 process,
136 view_process_exe,
137 view_process_env,
138 request_sender,
139 response_receiver,
140 event_listener: Some(ev),
141 headless,
142 generation: ViewProcessGen::first(),
143 is_respawn: false,
144 last_respawn: None,
145 fast_respawn_count: 0,
146 };
147
148 if let Err(ChannelError::Disconnected { .. }) = c.try_init() {
149 panic!("respawn on init");
150 }
151
152 c
153 }
154 fn spawn_same_process_listener(
155 mut on_event: Box<dyn FnMut(Event) + Send>,
156 mut event_receiver: EventReceiver,
157 generation: ViewProcessGen,
158 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
159 thread::Builder::new()
160 .name("same_process_listener".into())
161 .spawn(move || {
162 while let Ok(ev) = event_receiver.recv() {
163 on_event(ev);
164 }
165 on_event(Event::Disconnected(generation));
166
167 on_event
169 })
170 .expect("failed to spawn thread")
171 }
172 #[cfg(ipc)]
173 fn spawn_other_process_listener(
174 mut on_event: Box<dyn FnMut(Event) + Send>,
175 mut event_receiver: EventReceiver,
176 process: Arc<Mutex<Option<(std::process::Child, StderrTap, bool)>>>,
177 generation: ViewProcessGen,
178 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
179 thread::Builder::new()
183 .name("other_process_listener".into())
184 .spawn(move || {
185 const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
186 let timeout = view_timeout();
187 let mut check_count = 0u64;
188 loop {
189 match event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
190 Ok(ev) => {
191 check_count = 0;
192 on_event(ev)
193 }
194 Err(ChannelError::Timeout) => {
195 if let Some(p) = &mut *process.lock() {
196 match p.0.try_wait() {
197 Ok(c) => {
198 if c.is_some() {
199 break;
201 } else {
202 check_count += 1;
203 if check_count == timeout {
204 tracing::error!("view-process not responding for {timeout}s, will respawn");
205 let _ = p.0.kill();
206 p.2 = true;
207 break;
208 }
209 }
210 }
211 Err(e) => {
212 if e.kind() != std::io::ErrorKind::Interrupted {
213 tracing::error!("view-process try_wait error after inactivity, {e}");
214 break;
215 }
216 }
217 }
218 } else {
219 break;
221 }
222 }
223 Err(_) => break,
224 }
225 }
226 on_event(Event::Disconnected(generation));
227
228 on_event
230 })
231 .expect("failed to spawn thread")
232 }
233
234 fn try_init(&mut self) -> VpResult<()> {
235 self.init(self.generation, self.is_respawn, self.headless)?;
236 Ok(())
237 }
238
239 pub fn is_connected(&self) -> bool {
241 matches!(self.view_state, ViewState::RunningAndConnected)
242 }
243
244 pub fn generation(&self) -> ViewProcessGen {
246 self.generation
247 }
248
249 pub fn headless(&self) -> bool {
251 self.headless
252 }
253
254 pub fn same_process(&self) -> bool {
256 self.same_process
257 }
258
259 fn try_talk(&mut self, req: Request) -> Result<Response, ChannelError> {
260 self.request_sender.send(req)?;
261 self.response_receiver.recv()
262 }
263 pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
264 debug_assert!(req.expect_response());
265
266 tracing::trace!("talk {req:?}");
267
268 if req.must_be_connected() && !self.is_connected() {
269 tracing::error!("cannot send request {req:?}, not connected");
270 return Err(ChannelError::disconnected());
271 }
272
273 match self.try_talk(req) {
274 Ok(r) => {
275 tracing::trace!("talk {r:?}");
276 Ok(r)
277 }
278 Err(ChannelError::Disconnected { cause }) => {
279 self.handle_disconnect(self.generation);
280 Err(ChannelError::Disconnected { cause })
281 }
282 e => e,
283 }
284 }
285
286 pub(crate) fn command(&mut self, req: Request) -> Result<(), ChannelError> {
287 debug_assert!(!req.expect_response());
288
289 tracing::trace!("command {req:?}");
290
291 if req.must_be_connected() && !self.is_connected() {
292 tracing::error!("cannot send request {req:?}, not connected");
293 return Err(ChannelError::disconnected());
294 }
295
296 match self.request_sender.send(req) {
297 Ok(_) => {
298 tracing::trace!("command ok");
299 Ok(())
300 }
301 Err(ChannelError::Disconnected { cause }) => {
302 self.handle_disconnect(self.generation);
303 Err(ChannelError::Disconnected { cause })
304 }
305 e => e,
306 }
307 }
308
309 #[allow(clippy::type_complexity)]
310 fn spawn_view_process(
311 view_process_exe: &Path,
312 view_process_env: &HashMap<Txt, Txt>,
313 headless: bool,
314 ) -> AnyResult<(
315 Option<(std::process::Child, StderrTap)>,
316 ipc::RequestSender,
317 ipc::ResponseReceiver,
318 ipc::EventReceiver,
319 )> {
320 let _span = tracing::trace_span!("spawn_view_process").entered();
321
322 let init = ipc::AppInit::new();
323
324 let process = if ViewConfig::is_awaiting_same_process() {
326 ViewConfig::set_same_process(ViewConfig {
327 version: crate::VERSION.into(),
328 server_name: Txt::from_str(init.name()),
329 headless,
330 });
331 None
332 } else {
333 #[cfg(not(ipc))]
334 {
335 let _ = (view_process_exe, view_process_env);
336 panic!("expected only same_process mode with `ipc` feature disabled");
337 }
338
339 #[cfg(ipc)]
340 {
341 let mut process = std::process::Command::new(view_process_exe);
342 for (name, val) in view_process_env {
343 process.env(name, val);
344 }
345 let mut process = process
346 .env(VIEW_VERSION, crate::VERSION)
347 .env(VIEW_SERVER, init.name())
348 .env(VIEW_MODE, if headless { "headless" } else { "headed" })
349 .env("RUST_BACKTRACE", "full")
350 .stderr(std::process::Stdio::piped())
351 .spawn()?;
352
353 let stderr = StderrTap::new_blocking(process.stderr.take().unwrap());
354
355 Some((process, stderr))
356 }
357 };
358
359 let (req, rsp, ev) = match init.connect() {
360 Ok(r) => r,
361 Err(e) => {
362 #[cfg(ipc)]
363 if let Some((mut p, _)) = process {
364 if let Err(ke) = p.kill() {
365 tracing::error!(
366 "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
367 );
368 } else {
369 match p.wait() {
370 Ok(output) => {
371 let code = output.code();
372 if ViewConfig::is_version_err(code, None) {
373 let code = code.unwrap_or(1);
374 tracing::error!(
375 "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
376 will exit app-process with code 0x{code:x}"
377 );
378 zng_env::exit(code);
379 } else {
380 tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
381 }
382 }
383 Err(e) => {
384 tracing::error!("failed to read output status of killed view-process, {e}");
385 }
386 }
387 }
388 } else {
389 tracing::error!("failed to connect with same process");
390 }
391 return Err(e);
392 }
393 };
394
395 Ok((process, req, rsp, ev))
396 }
397
398 pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
402 match self.view_state {
403 ViewState::NotRunning => {
404 if self.generation == vp_gen {
405 self.view_state = ViewState::RunningAndConnected;
407 }
408 }
409 ViewState::Suspended => {
410 self.generation = vp_gen;
411 self.view_state = ViewState::RunningAndConnected;
412 }
413 ViewState::RunningAndConnected => {}
414 }
415 }
416
417 pub fn handle_suspended(&mut self) {
421 self.view_state = ViewState::Suspended;
422 }
423
424 pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
444 if vp_gen == self.generation {
445 #[cfg(not(ipc))]
446 {
447 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
448 }
449
450 #[cfg(ipc)]
451 {
452 self.respawn_impl(true)
453 }
454 } else {
455 tracing::warn!("disconnected event from previous generation ignored")
456 }
457 }
458
459 pub fn respawn(&mut self) {
466 #[cfg(not(ipc))]
467 {
468 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
469 }
470
471 #[cfg(ipc)]
472 self.respawn_impl(false);
473 }
474 #[cfg(ipc)]
475 fn respawn_impl(&mut self, is_crash: bool) {
476 use zng_unit::TimeUnits;
477
478 self.view_state = ViewState::NotRunning;
479 self.is_respawn = true;
480
481 let (mut process, stderr, mut killed_by_us) = if let Some(p) = self.process.lock().take() {
482 p
483 } else {
484 if self.same_process {
485 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
486 }
487 return;
488 };
489 if is_crash {
490 tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
491 }
492
493 if is_crash {
494 let t = Instant::now();
495 if let Some(last_respawn) = self.last_respawn {
496 if t - last_respawn < Duration::from_secs(60) {
497 self.fast_respawn_count += 1;
498 if self.fast_respawn_count == 2 {
499 panic!("disconnect respawn happened 2 times in less than 1 minute");
500 }
501 } else {
502 self.fast_respawn_count = 0;
503 }
504 }
505 self.last_respawn = Some(t);
506 } else {
507 self.last_respawn = None;
508 }
509
510 if !is_crash {
512 let _ = process.kill();
513 killed_by_us = true;
514 } else if !matches!(process.try_wait(), Ok(Some(_))) {
515 thread::sleep(300.ms());
517
518 if !matches!(process.try_wait(), Ok(Some(_))) {
519 killed_by_us = true;
521 let _ = process.kill();
522 }
523 }
524
525 let exit_status = match process.wait() {
526 Ok(c) => Some(c),
527 Err(e) => {
528 tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
529 None
530 }
531 };
532
533 if let Some(c) = exit_status {
535 tracing::info!(target: "vp_respawn", "view-process killed");
536
537 let code = c.code();
538 #[allow(unused_mut)]
539 let mut signal = None::<i32>;
540
541 if !killed_by_us {
542 #[cfg(windows)]
545 if code == Some(1) {
546 tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
547 will exit app-process with the same code");
548 zng_env::exit(1);
549 }
550
551 #[cfg(unix)]
552 if code.is_none() {
553 use std::os::unix::process::ExitStatusExt as _;
554 signal = c.signal();
555
556 if let Some(sig) = signal
557 && [2, 9, 17, 19, 23].contains(&sig)
558 {
559 tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
560 will exit app-process with code 1");
561 zng_env::exit(1);
562 }
563 }
564 }
565
566 if !killed_by_us {
567 let code = code.unwrap_or(0);
568 let signal = signal.unwrap_or(0);
569 if code == 101
570 && let Ok(panic) = stderr.into_panic_blocking()
571 {
572 tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}, panic: {}", panic.display_no_backtrace());
573 } else {
574 tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
575 }
576 }
577
578 if ViewConfig::is_version_err(code, None) {
579 let code = code.unwrap_or(1);
580 tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
581 will exit app-process with code 0x{code:x}");
582 zng_env::exit(code);
583 }
584 } else {
585 tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
586 }
587
588 let on_event = match self.event_listener.take().unwrap().join() {
590 Ok(fn_) => fn_,
591 Err(p) => panic::resume_unwind(p),
592 };
593
594 let mut retries = 3;
596 let (new_process, request, response, event_listener) = loop {
597 match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
598 Ok(r) => break r,
599 Err(e) => {
600 tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
601 retries -= 1;
602 if retries == 0 {
603 panic!("failed to respawn `view-process` after 3 retries");
604 }
605 tracing::info!(target: "vp_respawn", "retrying respawn");
606 }
607 }
608 };
609
610 let (new_process, new_stderr) = new_process.unwrap();
612 self.process = Arc::new(Mutex::new(Some((new_process, new_stderr, false))));
613 self.request_sender = request;
614 self.response_receiver = response;
615
616 let next_id = self.generation.next();
617 self.generation = next_id;
618
619 let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
620 self.event_listener = Some(ev);
621
622 if let Err(ChannelError::Disconnected { .. }) = self.try_init() {
623 panic!("respawn on respawn startup");
624 }
625 }
626}
627impl Drop for Controller {
628 fn drop(&mut self) {
630 let _ = self.exit();
631 #[cfg(ipc)]
632 if let Some((mut process, _, _)) = self.process.lock().take()
633 && process.try_wait().is_err()
634 {
635 std::thread::sleep(Duration::from_secs(1));
636 if process.try_wait().is_err() {
637 tracing::error!("view-process did not exit after 1s, killing");
638 let _ = process.kill();
639 let _ = process.wait();
640 }
641 }
642 }
643}
644
645const VIEW_TIMEOUT: &str = "ZNG_VIEW_TIMEOUT";
646const VIEW_TIMEOUT_DEFAULT: u64 = 20;
647pub(crate) fn view_timeout() -> u64 {
649 match std::env::var(VIEW_TIMEOUT) {
650 Ok(s) if !s.is_empty() => match s.parse::<u64>() {
651 Ok(s) => s.max(5),
652 Err(e) => {
653 if s == "false" {
654 return u64::MAX;
655 }
656 tracing::error!("invalid {VIEW_TIMEOUT:?} value, {e}");
657 VIEW_TIMEOUT_DEFAULT
658 }
659 },
660 _ => VIEW_TIMEOUT_DEFAULT,
661 }
662}